Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Internalize FNS logic to stage FinalizeBlock events #2399

Merged
merged 3 commits into from
Sep 30, 2024

Conversation

teddyding
Copy link
Contributor

@teddyding teddyding commented Sep 27, 2024

Changelist

Context
The FNS manager interface now only includes SendXXXXUpdate, and decides whether to stage or send directly based on the input context.

Test Plan

Testing on feature branch feat/full-node-streaming

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

  • New Features

    • Introduced support for order book updates in the event handling system.
    • Added a new method for sending subaccount updates.
  • Changes

    • Updated method signatures to streamline the handling of order book fill updates, now focused on single updates instead of batches.
    • Simplified context management across various streaming manager methods.

These enhancements improve the efficiency and clarity of order book and subaccount update processes.

@teddyding teddyding marked this pull request as ready for review September 27, 2024 19:59
@teddyding teddyding requested a review from a team as a code owner September 27, 2024 19:59
Copy link
Contributor

coderabbitai bot commented Sep 27, 2024

Walkthrough

The pull request introduces significant modifications across various files, primarily focusing on the handling of order book fill updates and subaccount updates. Key changes include the addition of a new field in the StagedFinalizeBlockEvent message to accommodate order book updates, renaming methods to reflect a shift from handling multiple updates to single updates, and adjustments to method signatures to streamline context management. These changes enhance the clarity and functionality of the code related to event streaming and updates.

Changes

Files Change Summary
proto/dydxprotocol/clob/streaming.proto Added a new field StreamOrderbookUpdate orderbook_update = 3; in StagedFinalizeBlockEvent to handle order book updates.
protocol/mocks/MemClobKeeper.go Renamed SendOrderbookFillUpdates to SendOrderbookFillUpdate and changed its parameter from a slice to a single instance of StreamOrderbookFill.
protocol/streaming/full_node_streaming_manager.go Renamed and modified methods to streamline handling of updates, integrating StageFinalizeBlockFill into SendOrderbookUpdates and adjusting for context management.
protocol/streaming/noop_streaming_manager.go Updated method signatures to replace execMode with ctx, renamed SendOrderbookFillUpdates to SendOrderbookFillUpdate, and removed StageFinalizeBlockFill.
protocol/streaming/types/interface.go Updated the FullNodeStreamingManager interface to reflect the changes in method signatures and parameters.
protocol/testutil/memclob/keeper.go Renamed SendOrderbookFillUpdates to SendOrderbookFillUpdate, changing the parameter from a slice to a single instance.
protocol/x/clob/keeper/keeper.go Renamed SendOrderbookFillUpdates to SendOrderbookFillUpdate and adjusted internal logic accordingly.
protocol/x/clob/keeper/process_operations.go Replaced calls to StageFinalizeBlockFill and SendOrderbookFillUpdates with SendOrderbookFillUpdate, simplifying parameters.
protocol/x/clob/memclob/memclob.go Updated the method call in mustUpdateMemclobStateWithMatches to use SendOrderbookFillUpdate instead of SendOrderbookFillUpdates.
protocol/x/clob/types/mem_clob_keeper.go Renamed SendOrderbookFillUpdates to SendOrderbookFillUpdate, changing the parameter from a slice to a single instance.
protocol/x/subaccounts/keeper/subaccount.go Replaced StageFinalizeBlockSubaccountUpdate with SendSubaccountUpdate, maintaining the same parameters.

Possibly related PRs

Suggested reviewers

  • jonfung-dydx
  • dydxwill

Poem

🐇 In the code where rabbits play,
Order fills now hop away.
Single updates, swift and bright,
Streaming changes, pure delight!
With every tweak, we make it right,
Hopping forward, day and night! 🌟


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 1fb2b95 and 804b7d2.

📒 Files selected for processing (1)
  • protocol/streaming/full_node_streaming_manager.go (14 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • protocol/streaming/full_node_streaming_manager.go

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (8)
protocol/streaming/types/interface.go (1)

Line range hint 1-72: Summary of changes to FullNodeStreamingManager interface

The changes to this interface align well with the PR objective of internalizing logic for FinalizeBlock events. Key observations:

  1. Consistent replacement of execMode sdk.ExecMode with ctx sdk.Context, providing more flexibility and information to the methods.
  2. Shift towards handling individual updates (e.g., SendOrderbookFillUpdate) rather than batches in some cases.
  3. Removal of StageFinalizeBlockFill and addition of SendSubaccountUpdate, suggesting a restructuring of FinalizeBlock event handling.

These changes appear to improve the interface's flexibility and granularity. However, please ensure that:

  1. The performance implications of processing individual updates instead of batches have been considered.
  2. The functionality previously handled by StageFinalizeBlockFill is adequately covered in the new structure.
  3. All parts of the codebase that interact with this interface have been updated to reflect these changes.

Consider documenting these significant interface changes, especially the rationale behind moving from batch to individual processing and the new approach to handling FinalizeBlock events. This will help maintain the codebase's clarity and assist future developers in understanding the design decisions.

protocol/streaming/noop_streaming_manager.go (3)

35-35: LGTM. Consider adding a comment explaining the noop behavior.

The change from execMode sdk.ExecMode to ctx sdk.Context is appropriate and consistent with the PR objectives. It provides more flexibility and access to additional context information.

Consider adding a comment explaining why this method does nothing, as it might not be immediately clear to other developers why a "noop" implementation is used here. For example:

// SendOrderbookUpdates is a no-op implementation as part of the NoopGrpcStreamingManager.

88-91: LGTM. Consider adding a comment for clarity.

The rename from StageFinalizeBlockSubaccountUpdate to SendSubaccountUpdate is appropriate and aligns with the PR objectives of internalizing logic for FinalizeBlock events.

Consider adding a comment to explain the purpose of this method and its no-op nature:

// SendSubaccountUpdate is a no-op implementation for sending subaccount updates
// as part of the NoopGrpcStreamingManager.

Line range hint 1-103: Overall changes look good. Consider updating documentation.

The changes in this file are consistent with the PR objectives of internalizing logic for FinalizeBlock events. The shift from execMode to ctx and the renaming of methods improve flexibility and clarity.

Consider updating the package-level documentation or README to reflect these changes in the event handling flow, especially:

  1. The removal of the StageFinalizeBlockFill method.
  2. The renaming of methods from "stage" to "send" (e.g., StageFinalizeBlockSubaccountUpdate to SendSubaccountUpdate).
  3. The change in SendOrderbookFillUpdate to handle individual updates instead of batches.

This will help other developers understand the new architecture and event flow more easily.

protocol/x/clob/keeper/process_operations.go (3)

563-567: LGTM! Consider extracting common parameters.

The change from StageFinalizeBlockFill to SendOrderbookFillUpdate looks good and appears to provide more detailed information for the update. This change is consistent with the modifications in other functions.

Consider extracting the common parameters (uint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId) into a separate struct or function to reduce duplication across similar function calls in this file.


850-852: LGTM, but consider aligning with other functions for consistency.

The change to use SendOrderbookFillUpdate is consistent with the overall refactoring. However, unlike the other modified functions, this one doesn't include the additional parameters (block height, context, and perpetual ID to CLOB pair ID mapping).

For consistency and to potentially future-proof this function, consider aligning the SendOrderbookFillUpdate call with the implementation in PersistMatchOrdersToState and PersistMatchLiquidationToState. This might involve adding the missing parameters:

k.SendOrderbookFillUpdate(
    streamOrderbookFill,
    uint32(ctx.BlockHeight()),
    ctx,
    k.PerpetualIdToClobPairId,
)

If these parameters are not needed in this context, it might be worth adding a comment explaining why to prevent future confusion.


563-567: Overall LGTM. Consider addressing consistency and potential refactoring.

The changes in this file primarily involve replacing StageFinalizeBlockFill with SendOrderbookFillUpdate across multiple functions. This refactoring appears to improve the handling of orderbook fill updates by providing more detailed information.

To further improve the code:

  1. Consider extracting the common parameters (uint32(ctx.BlockHeight()), ctx, k.PerpetualIdToClobPairId) into a separate struct or function to reduce duplication.
  2. Align the implementation in PersistMatchDeleveragingToState with the other functions for consistency, or add a comment explaining the difference if it's intentional.
  3. If this change is part of a larger refactoring effort, ensure that all related parts of the codebase are updated consistently.

These improvements will enhance code maintainability and reduce the likelihood of future inconsistencies or errors.

Also applies to: 675-679, 850-852

protocol/streaming/full_node_streaming_manager.go (1)

610-612: TODO: Implement missing logic

There is a // TODO comment here indicating that some implementation is missing. It's important to address this to ensure the functionality is complete.

Would you like me to help implement the missing logic or open a GitHub issue to track this task?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 0a3b3b8 and 2e7d9c9.

⛔ Files ignored due to path filters (1)
  • protocol/x/clob/types/streaming.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (11)
  • proto/dydxprotocol/clob/streaming.proto (1 hunks)
  • protocol/mocks/MemClobKeeper.go (1 hunks)
  • protocol/streaming/full_node_streaming_manager.go (3 hunks)
  • protocol/streaming/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/types/interface.go (2 hunks)
  • protocol/testutil/memclob/keeper.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (1 hunks)
  • protocol/x/clob/keeper/process_operations.go (3 hunks)
  • protocol/x/clob/memclob/memclob.go (1 hunks)
  • protocol/x/clob/types/mem_clob_keeper.go (1 hunks)
  • protocol/x/subaccounts/keeper/subaccount.go (1 hunks)
🔇 Additional comments (17)
protocol/streaming/types/interface.go (4)

37-41: Approve changes to SendOrderbookFillUpdate method, but consider performance implications.

The method has been renamed and modified to handle individual orderbook fills instead of a batch. This change allows for more granular control and potentially easier error handling. The replacement of execMode with ctx sdk.Context is consistent with other changes and provides more flexibility.

However, please consider the performance implications of processing individual fills instead of batches. Can you provide insights into why this change was made and if there are any potential performance concerns?

To verify the usage of this updated method, please run the following script:

#!/bin/bash
# Description: Verify all calls to SendOrderbookFillUpdate use the new signature with individual fill and sdk.Context

# Test: Search for function calls to SendOrderbookFillUpdate. Expect: Only occurrences of the new signature.
rg --type go -A 5 $'SendOrderbookFillUpdate\('

Line range hint 1-1: Clarify handling of FinalizeBlock fill events after removal of StageFinalizeBlockFill.

The StageFinalizeBlockFill method has been removed from the interface. This aligns with the PR objective of internalizing logic for FinalizeBlock events. However, it's important to ensure that the functionality provided by this method is either no longer needed or has been replaced elsewhere.

Could you please clarify how the functionality previously handled by StageFinalizeBlockFill is now being managed? Are there any other parts of the codebase that need to be updated due to this removal?

To help verify the impact of this removal, please run the following script:

#!/bin/bash
# Description: Search for any remaining references to StageFinalizeBlockFill

# Test: Search for any occurrences of StageFinalizeBlockFill. Expect: No results.
rg --type go $'StageFinalizeBlockFill'

53-56: Approve addition of SendSubaccountUpdate method.

The new SendSubaccountUpdate method is a welcome addition to the interface. It's consistent with the use of sdk.Context in other methods and appears to be part of the restructuring to internalize FinalizeBlock event logic.

To ensure proper implementation and usage of this new method, please run the following script:

#!/bin/bash
# Description: Verify the implementation and usage of the new SendSubaccountUpdate method

# Test 1: Search for the implementation of SendSubaccountUpdate
echo "Searching for SendSubaccountUpdate implementation:"
rg --type go -A 10 $'func \(.*\) SendSubaccountUpdate\('

# Test 2: Search for calls to SendSubaccountUpdate
echo "Searching for SendSubaccountUpdate usage:"
rg --type go -A 5 $'SendSubaccountUpdate\('

Please review the implementation to ensure it correctly handles the subaccount update logic, and verify that it's being called appropriately throughout the codebase.


35-35: Approve change to SendOrderbookUpdates method signature.

The replacement of execMode sdk.ExecMode with ctx sdk.Context provides more flexibility and access to additional information that might be needed for sending orderbook updates. This change aligns with the PR objective of internalizing logic for FinalizeBlock events.

To ensure consistency across the codebase, please run the following script to verify the usage of this method:

✅ Verification successful

Apologies for the confusion in the previous script. Please use the corrected script below to verify all calls to SendOrderbookUpdates utilize the new signature with sdk.Context:


Verification Successful: All SendOrderbookUpdates Calls Updated Appropriately

All instances of SendOrderbookUpdates across the codebase now utilize the new signature with sdk.Context, ensuring consistency and alignment with the PR objectives.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify all calls to SendOrderbookUpdates use the new signature with sdk.Context

# Test: Search for function calls to SendOrderbookUpdates. Expect: Only occurrences of the new signature.
rg --type go -A 5 $'SendOrderbookUpdates\('

Length of output: 3828


Script:

#!/bin/bash
# Description: Verify all calls to SendOrderbookUpdates use the new signature with sdk.Context

# Test: Search for function calls to SendOrderbookUpdates. Expect: Only occurrences of the new signature.
rg --type go -A 5 'SendOrderbookUpdates\('

Length of output: 5901

protocol/streaming/noop_streaming_manager.go (1)

39-44: LGTM. Consider updating related code for consistency.

The changes to this method, including the rename and parameter updates, are appropriate and align with the PR objectives. The shift from batch processing to individual processing of order book fills may improve granularity and flexibility.

To ensure consistency across the codebase, please run the following script to check for any remaining references to the old method name:

If any results are found, consider updating those references to use the new method name and signature.

protocol/x/clob/types/mem_clob_keeper.go (1)

105-108: Approve the change with considerations.

The method signature change from SendOrderbookFillUpdates to SendOrderbookFillUpdate simplifies the interface by handling single updates instead of batches. This change is acceptable, but there are some considerations:

  1. This change may impact the performance and behavior of the system, potentially requiring more frequent method calls for multiple updates.
  2. Ensure that all implementations and callers of this method are updated accordingly.

To verify the impact and consistency of this change, please run the following script:

Consider adding batch processing capabilities if handling multiple updates efficiently is still a requirement for the system.

protocol/x/clob/keeper/keeper.go (1)

317-327: LGTM: Method refactored to handle single orderbook fill.

The changes to SendOrderbookFillUpdate look good. The method has been refactored to handle a single orderbook fill instead of a batch, which aligns with the PR objective of internalizing logic to stage FinalizeBlock events.

To ensure this change doesn't break existing functionality, please verify:

  1. All callers of this method have been updated to pass a single StreamOrderbookFill.
  2. The FullNodeStreamingManager interface has been updated to accept a single fill.

Run the following script to check for any remaining calls to the old method signature:

protocol/testutil/memclob/keeper.go (1)

511-514: Method signature updated to handle single orderbook fill

The SendOrderbookFillUpdates method has been renamed to SendOrderbookFillUpdate, and its parameter type has changed from a slice of types.StreamOrderbookFill to a single types.StreamOrderbookFill. This change aligns with processing individual orderbook fills rather than batches.

To ensure this change is consistent across the codebase, please run the following script:

This script will help identify any places where the old method name or parameter type is still being used, ensuring consistency with the new implementation.

✅ Verification successful

Verification successful: No remaining references found

All references to SendOrderbookFillUpdates and []types.StreamOrderbookFill have been removed or updated accordingly.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining references to SendOrderbookFillUpdates and usages of []types.StreamOrderbookFill

# Search for any remaining references to SendOrderbookFillUpdates
echo "Searching for SendOrderbookFillUpdates references:"
rg --type go "SendOrderbookFillUpdates"

# Search for any remaining usages of []types.StreamOrderbookFill
echo "Searching for []types.StreamOrderbookFill usages:"
rg --type go "\[\]types\.StreamOrderbookFill"

Length of output: 307

protocol/mocks/MemClobKeeper.go (1)

418-420: LGTM! Verify usage across the codebase.

The change from SendOrderbookFillUpdates to SendOrderbookFillUpdate and the corresponding parameter change from a slice to a single StreamOrderbookFill looks good. This shift aligns with the PR objective of internalizing logic for FinalizeBlock events, potentially allowing for more granular control over orderbook fill updates.

To ensure this change doesn't break existing functionality, please run the following script to check for any remaining usages of the old method name:

protocol/x/subaccounts/keeper/subaccount.go (2)

Line range hint 1-451: Ensure comprehensive testing of the subaccount update process.

While the change is isolated to the UpdateSubaccounts function, it's crucial to verify that this modification doesn't introduce any unintended side effects in the overall subaccount update process.

To ensure the change doesn't affect other parts of the system, please run the following tests:

#!/bin/bash
# Run all tests related to subaccount updates
go test ./... -run "TestSubaccountUpdate|TestUpdateSubaccounts"

# Check for any integration or end-to-end tests that involve subaccount updates
fd -e go | xargs grep -l "UpdateSubaccounts" | xargs grep -l "func Test"

Additionally, consider adding or updating integration tests to specifically cover the new SendSubaccountUpdate functionality in various scenarios.


448-451: LGTM! Verify related changes for consistency.

The change from StageFinalizeBlockSubaccountUpdate to SendSubaccountUpdate aligns with the PR objective of internalizing the logic for staging FinalizeBlock events. This modification appears to maintain the existing functionality while potentially improving the directness and efficiency of sending subaccount updates.

To ensure consistency across the codebase, please run the following script to check for any other occurrences of StageFinalizeBlockSubaccountUpdate that might need similar updates:

protocol/x/clob/keeper/process_operations.go (1)

675-679: LGTM! Consistent with previous changes.

The modification here is consistent with the changes in PersistMatchOrdersToState, replacing StageFinalizeBlockFill with SendOrderbookFillUpdate and adding the same new parameters.

This change reinforces the suggestion to consider extracting these common parameters into a separate struct or function to improve code maintainability.

protocol/x/clob/memclob/memclob.go (1)

405-405: LGTM! Consider verifying the SendOrderbookFillUpdate method signature.

The change from SendOrderbookFillUpdates to SendOrderbookFillUpdate simplifies the method call by passing a single orderbookMatchFill object instead of a slice. This is likely to improve performance slightly by avoiding the creation of a temporary slice.

To ensure consistency, please run the following command to verify the signature of the SendOrderbookFillUpdate method:

protocol/streaming/full_node_streaming_manager.go (4)

398-400: Logic Update Approved: Early return when not in DeliverTx mode

The addition of the check for !lib.IsDeliverTxMode(ctx) ensures that subaccount updates are only staged during DeliverTx mode, preventing optimistic updates from being streamed inadvertently. This is a sensible safeguard.


618-627: Logic Update Approved: Stream optimistic updates directly when not in DeliverTx mode

The added logic correctly handles optimistic updates by streaming them directly when not in DeliverTx mode, which aligns with the intended behavior.


629-639: ⚠️ Potential issue

Handle errors instead of panicking

Similar to an earlier point, the use of panic(err) is present in this section when handling errors. Consider modifying the error handling to avoid panics.

Apply this diff to handle the error properly:

 func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
     orderbookFill clobtypes.StreamOrderbookFill,
     blockHeight uint32,
     ctx sdk.Context,
     perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
 ) {
     // ...
     v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates)
     if err != nil {
-        panic(err)
+        sm.logger.Error("Failed to get offchain updates V1", "error", err)
+        return
     }
     // ...
 }

Likely invalid or redundant comment.


394-396: Function Rename: Ensure consistency across the codebase

The method StageFinalizeBlockSubaccountUpdate has been renamed to SendSubaccountUpdate. Please verify that all references to the old method name have been updated to prevent any runtime errors.

You can run the following script to check for any lingering references to StageFinalizeBlockSubaccountUpdate:

✅ Verification successful

All references to StageFinalizeBlockSubaccountUpdate have been successfully updated to SendSubaccountUpdate across the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find references to the old method name in the codebase.
# Expected result: No occurrences of `StageFinalizeBlockSubaccountUpdate`.

rg --type go --fixed-strings 'StageFinalizeBlockSubaccountUpdate'

Length of output: 65


Script:

#!/bin/bash
# Comprehensive search for any references to 'StageFinalizeBlockSubaccountUpdate' across all file types.
# Expected result: No occurrences of 'StageFinalizeBlockSubaccountUpdate' in the codebase.

rg --fixed-strings 'StageFinalizeBlockSubaccountUpdate'

Length of output: 55


Script:

#!/bin/bash
# Case-insensitive search for 'StageFinalizeBlockSubaccountUpdate' across all file types.
rg -i --fixed-strings 'StageFinalizeBlockSubaccountUpdate'

# Search for dynamic references to 'StageFinalizeBlockSubaccountUpdate', such as within strings or concatenations.
rg --regexp 'StageFinalizeBlockSubaccountUpdate|StageFinalizeBlock' 

# Search within documentation and comments for references to the old method name.
rg --type markdown --type go --type txt --type doc --regexp 'StageFinalizeBlockSubaccountUpdate'

Length of output: 253


Script:

#!/bin/bash
# Case-insensitive search for 'StageFinalizeBlockSubaccountUpdate' across all file types.
rg -i --fixed-strings 'StageFinalizeBlockSubaccountUpdate'

# Search for dynamic references to 'StageFinalizeBlockSubaccountUpdate', such as within strings or concatenations.
rg --regexp 'StageFinalizeBlockSubaccountUpdate|StageFinalizeBlock'

# Search within documentation and comments for references to the old method name.
rg --type markdown --type go --type txt --regexp 'StageFinalizeBlockSubaccountUpdate'

Length of output: 214

@@ -12,5 +12,6 @@ message StagedFinalizeBlockEvent {
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
StreamOrderbookUpdate orderbook_update = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing handling for orderbook_update in event switches

The new StreamOrderbookUpdate orderbook_update = 3; has been added to the StagedFinalizeBlockEvent message, but the corresponding handling in switch statements is missing. Please update the following file to handle the new event type:

  • protocol/streaming/full_node_streaming_manager.go: Add a case for *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate in the switch statement handling StagedFinalizeBlockEvent.Event.
🔗 Analysis chain

LGTM! Verify related code updates.

The addition of StreamOrderbookUpdate orderbook_update = 3; to the StagedFinalizeBlockEvent message is well-structured and maintains backward compatibility. This change aligns with the PR objective of internalizing logic to stage FinalizeBlock events.

To ensure comprehensive implementation, please verify that all code handling StagedFinalizeBlockEvent has been updated to account for this new event type. Run the following script to identify potentially affected areas:

Review the output to ensure all relevant code paths have been updated to handle the new orderbook_update event type.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find usages of StagedFinalizeBlockEvent in the codebase

# Search for StagedFinalizeBlockEvent usage
echo "Searching for StagedFinalizeBlockEvent usage:"
rg --type go -A 5 "StagedFinalizeBlockEvent"

# Search for switch or if statements handling event types
echo "Searching for switch or if statements handling event types:"
rg --type go -A 10 -e "switch.*event" -e "if.*event\s*:?=\s*"

Length of output: 24292

Comment on lines 548 to 550
if err != nil {
panic(err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors instead of panicking

The use of panic(err) for error handling may not be appropriate in production code, as it can cause the application to crash. Consider returning the error or handling it gracefully to improve the robustness of the application.

Apply this diff to handle the error properly:

 func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
     offchainUpdates *clobtypes.OffchainUpdates,
     blockHeight uint32,
     ctx sdk.Context,
 ) {
     // ...
     v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates)
     if err != nil {
-        panic(err)
+        sm.logger.Error("Failed to get offchain updates V1", "error", err)
+        return
     }
     // ...
 }

Committable suggestion was skipped due to low confidence.

// execution (once optimistically and optionally once on the canonical block),
// we need to stage the events in transient store and later emit them
// during `Precommit`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Folded into SendOrderbookFillUpdate

v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, message := range offchainUpdates.Messages {
var update ocutypes.OffChainUpdateV1
err := proto.Unmarshal(message.Message.Value, &update)
if err != nil {
return nil, err
panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upstream always panics anyways; not returning error for simplicity

@@ -501,28 +483,37 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
}

func getStreamUpdatesFromOffchainUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
v1updates []ocutypes.OffChainUpdateV1,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch to []ocutypes.OffChainUpdateV1 so both it could be used by both here and here

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (3)
protocol/streaming/full_node_streaming_manager.go (3)

Line range hint 393-411: LGTM: Improved SendSubaccountUpdate function

The changes to this function are well-structured and improve its functionality:

  1. The function name is now more generic, allowing for broader use.
  2. The DeliverTx mode check ensures that updates are only staged when appropriate.
  3. Early return for non-DeliverTx modes prevents unnecessary processing.

One minor suggestion:

Consider adding a comment explaining why optimistic subaccount updates are not streamed in non-DeliverTx mode. This would improve code readability and maintainability.

 // If not `DeliverTx`, return since we don't stream optimistic subaccount updates.
 if !lib.IsDeliverTxMode(ctx) {
+    // Optimistic updates are not streamed to ensure consistency with the finalized state
     return
 }

536-568: LGTM: Improved SendOrderbookUpdates with context-aware processing

The changes to this function are well-structured and improve its functionality:

  1. Using sdk.Context provides more flexibility and access to additional information.
  2. The split between optimistic and consensus update handling is clear and logical.
  3. Staging events for consensus updates aligns with the overall design for handling DeliverTx mode updates.

Suggestion for improvement:

Consider extracting the logic for creating and staging the event into a separate helper function. This would improve readability and potentially allow for reuse in similar scenarios.

+func (sm *FullNodeStreamingManagerImpl) stageOrderbookUpdate(ctx sdk.Context, v1updates []ocutypes.OffChainUpdateV1) {
+    stagedEvent := clobtypes.StagedFinalizeBlockEvent{
+        Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{
+            OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
+                Updates:  v1updates,
+                Snapshot: false,
+            },
+        },
+    }
+    sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent))
+}

 // In the SendOrderbookUpdates function
 if lib.IsDeliverTxMode(ctx) {
-    stagedEvent := clobtypes.StagedFinalizeBlockEvent{
-        Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{
-            OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
-                Updates:  v1updates,
-                Snapshot: false,
-            },
-        },
-    }
-    sm.stageFinalizeBlockEvent(
-        ctx,
-        sm.cdc.MustMarshal(&stagedEvent),
-    )
+    sm.stageOrderbookUpdate(ctx, v1updates)
 }

Line range hint 972-1004: LGTM: Updated getStagedEventsFromFinalizeBlock to handle orderbook updates

The changes to this function are well-implemented and consistent with the overall updates:

  1. The addition of finalizedOrderbookUpdates as a return value completes the set of finalized event types.
  2. The handling of StagedFinalizeBlockEvent_OrderbookUpdate is correctly implemented.

Suggestion for improvement:

Consider using a map to group the different event types instead of separate slices. This could make the function more scalable and easier to maintain if new event types are added in the future.

 func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
     ctx sdk.Context,
 ) (
-    finalizedFills []clobtypes.StreamOrderbookFill,
-    finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate,
-    finalizedOrderbookUpdates []clobtypes.StreamOrderbookUpdate,
+    finalizedEvents map[string]interface{},
 ) {
+    finalizedEvents = make(map[string]interface{})
+    finalizedEvents["fills"] = []clobtypes.StreamOrderbookFill{}
+    finalizedEvents["subaccountUpdates"] = []satypes.StreamSubaccountUpdate{}
+    finalizedEvents["orderbookUpdates"] = []clobtypes.StreamOrderbookUpdate{}
     
     // ... existing code ...
     
     for _, stagedEvent := range stagedEvents {
         switch event := stagedEvent.Event.(type) {
         case *clobtypes.StagedFinalizeBlockEvent_OrderFill:
-            finalizedFills = append(finalizedFills, *event.OrderFill)
+            finalizedEvents["fills"] = append(finalizedEvents["fills"].([]clobtypes.StreamOrderbookFill), *event.OrderFill)
         case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate:
-            finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate)
+            finalizedEvents["subaccountUpdates"] = append(finalizedEvents["subaccountUpdates"].([]satypes.StreamSubaccountUpdate), *event.SubaccountUpdate)
         case *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate:
-            finalizedOrderbookUpdates = append(finalizedOrderbookUpdates, *event.OrderbookUpdate)
+            finalizedEvents["orderbookUpdates"] = append(finalizedEvents["orderbookUpdates"].([]clobtypes.StreamOrderbookUpdate), *event.OrderbookUpdate)
         default:
             panic(fmt.Sprintf("Unhandled staged event type: %v\n", stagedEvent.Event))
         }
     }
     
-    return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates
+    return finalizedEvents
 }

This change would make it easier to add new event types in the future without modifying the function signature.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between c4f1566 and ee90970.

⛔ Files ignored due to path filters (1)
  • protocol/x/clob/types/streaming.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (13)
  • indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts (5 hunks)
  • proto/dydxprotocol/clob/streaming.proto (1 hunks)
  • protocol/mocks/MemClobKeeper.go (1 hunks)
  • protocol/streaming/full_node_streaming_manager.go (14 hunks)
  • protocol/streaming/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/types/interface.go (1 hunks)
  • protocol/streaming/util/util.go (1 hunks)
  • protocol/testutil/memclob/keeper.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (2 hunks)
  • protocol/x/clob/keeper/process_operations.go (3 hunks)
  • protocol/x/clob/memclob/memclob.go (1 hunks)
  • protocol/x/clob/types/mem_clob_keeper.go (1 hunks)
  • protocol/x/subaccounts/keeper/subaccount.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (11)
  • indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts
  • proto/dydxprotocol/clob/streaming.proto
  • protocol/mocks/MemClobKeeper.go
  • protocol/streaming/noop_streaming_manager.go
  • protocol/streaming/types/interface.go
  • protocol/testutil/memclob/keeper.go
  • protocol/x/clob/keeper/keeper.go
  • protocol/x/clob/keeper/process_operations.go
  • protocol/x/clob/memclob/memclob.go
  • protocol/x/clob/types/mem_clob_keeper.go
  • protocol/x/subaccounts/keeper/subaccount.go
🔇 Additional comments (4)
protocol/streaming/full_node_streaming_manager.go (4)

23-24: LGTM: New import added for off-chain update types

The addition of the ocutypes import is consistent with the changes made in the file to handle off-chain updates.


870-885: LGTM: Improved naming and documentation for cacheStreamUpdatesByClobPairWithLock

The changes to this function enhance its clarity and usability:

  1. The new name cacheStreamUpdatesByClobPairWithLock clearly indicates that the function requires a lock, preventing potential misuse.
  2. The added comments provide valuable context about the function's purpose and the assumption of an acquired lock.

These improvements contribute to better code readability and maintainability.


Line range hint 887-902: LGTM: New function cacheStreamUpdatesBySubaccountWithLock added

The addition of cacheStreamUpdatesBySubaccountWithLock is a well-implemented extension of the caching functionality:

  1. It follows the same pattern as cacheStreamUpdatesByClobPairWithLock, maintaining consistency in the codebase.
  2. The function name clearly indicates its purpose and the requirement for a lock.
  3. The implementation correctly caches subaccount stream updates and their corresponding subscription IDs.

This new function enhances the streaming manager's capability to handle subaccount-specific updates efficiently.


Line range hint 1-1074: Overall assessment: Well-implemented improvements to the FullNodeStreamingManager

The changes made to this file significantly enhance the functionality and efficiency of the FullNodeStreamingManager:

  1. The introduction of context-aware processing in functions like SendSubaccountUpdate, SendOrderbookUpdates, and SendOrderbookFillUpdate improves flexibility and allows for better handling of different execution modes.

  2. The refactoring of update handling, particularly the separation of optimistic and consensus updates, aligns well with the system's architecture and improves clarity.

  3. The addition of new functions like cacheStreamUpdatesBySubaccountWithLock and the modifications to existing ones like getStreamUpdatesFromOffchainUpdates demonstrate a thoughtful approach to extending the manager's capabilities.

  4. The improvements in error handling and the addition of more detailed comments contribute to better code maintainability.

These changes appear to be part of a larger effort to optimize the streaming process and handle various types of updates more efficiently. The consistency in approach across different functions is commendable.

While there are a few minor suggestions for improvements (as noted in individual comments), the overall quality of the changes is high. The refactoring seems to have successfully enhanced the streaming manager's ability to handle different types of updates in a more organized and efficient manner.

Comment on lines 486 to 510
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
execMode sdk.ExecMode,
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
for _, v1update := range v1updates {
var clobPairId uint32
switch u := v1update.UpdateMessage.(type) {
case *ocutypes.OffChainUpdateV1_OrderPlace:
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderReplace:
clobPairId = u.OrderReplace.OldOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderRemove:
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderUpdate:
clobPairId = u.OrderUpdate.OrderId.ClobPairId
default:
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)

if _, ok := clobPairIdToV1Updates[clobPairId]; !ok {
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{}
}
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

LGTM: Refactored getStreamUpdatesFromOffchainUpdates for improved efficiency

The changes to this function are well-implemented:

  1. The function now works directly with OffChainUpdateV1 types, which is more efficient.
  2. The logic for grouping updates by clob pair id has been updated accordingly.

One suggestion for improvement:

Consider handling the default case in the switch statement more gracefully. Instead of panicking, you could log an error and continue processing other updates. This would make the function more robust in case of unexpected update types.

 default:
-    panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
+    sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u))
+    continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
execMode sdk.ExecMode,
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
for _, v1update := range v1updates {
var clobPairId uint32
switch u := v1update.UpdateMessage.(type) {
case *ocutypes.OffChainUpdateV1_OrderPlace:
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderReplace:
clobPairId = u.OrderReplace.OldOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderRemove:
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderUpdate:
clobPairId = u.OrderUpdate.OrderId.ClobPairId
default:
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok {
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{}
}
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update)
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
execMode sdk.ExecMode,
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
for _, v1update := range v1updates {
var clobPairId uint32
switch u := v1update.UpdateMessage.(type) {
case *ocutypes.OffChainUpdateV1_OrderPlace:
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderReplace:
clobPairId = u.OrderReplace.OldOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderRemove:
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderUpdate:
clobPairId = u.OrderUpdate.OrderId.ClobPairId
default:
sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u))
continue
}
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok {
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{}
}
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update)

Comment on lines 915 to 958
finalizedFills,
finalizedSubaccountUpdates,
finalizedOrderbookUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

sm.Lock()
defer sm.Unlock()

// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdatesWithLock()

orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(
orderBookUpdatesToSyncLocalOpsQueue,
// Cache updates to sync local ops queue
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)

// Cache updates for finalized fills.
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
finalizedFills,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)
sm.cacheStreamUpdatesByClobPairWithLock(fillStreamUpdates, fillClobPairIds)

// Cache updates for finalized orderbook updates (e.g. RemoveOrderFillAmount in `EndBlocker`).
for _, finalizedUpdate := range finalizedOrderbookUpdates {
streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(
finalizedUpdate.Updates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(streamUpdates, clobPairIds)
}

// Finally, cache updates for finalized subaccount updates
subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

sm.Lock()
defer sm.Unlock()

// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdatesWithLock()

sm.addBatchUpdatesToCacheWithLock(
orderbookStreamUpdates,
orderbookClobPairIds,
fillStreamUpdates,
fillClobPairIds,
subaccountStreamUpdates,
subaccountIds,
)
sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

LGTM: Enhanced StreamBatchUpdatesAfterFinalizeBlock with improved update handling

The changes to this function significantly improve its functionality and organization:

  1. The addition of handling for finalized orderbook updates completes the set of update types processed.
  2. The use of helper functions (getStagedEventsFromFinalizeBlock, getStreamUpdatesFromOffchainUpdates, etc.) improves code readability and maintainability.
  3. The clear separation between different types of updates (sync local, fills, orderbook, subaccount) enhances the function's structure.

Suggestion for improvement:

Consider adding error handling for the calls to helper functions. While they may not return errors currently, adding error checks would make the function more robust to future changes.

 // Cache updates to sync local ops queue
-sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
+sycnLocalUpdates, syncLocalClobPairIds, err := getStreamUpdatesFromOffchainUpdates(
     streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
     uint32(ctx.BlockHeight()),
     ctx.ExecMode(),
 )
+if err != nil {
+    sm.logger.Error("Failed to get stream updates for sync local ops queue", "error", err)
+    // Consider how to handle this error (e.g., skip these updates, return from the function, etc.)
+}
 sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)

Apply similar error handling to other helper function calls in this method.

Committable suggestion was skipped due to low confidence.

Comment on lines 608 to 644
// SendOrderbookFillUpdate groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)
// If not `DeliverTx`, then updates are optimistic. Stream them directly.
if !lib.IsDeliverTxMode(ctx) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
[]clobtypes.StreamOrderbookFill{orderbookFill},
blockHeight,
ctx.ExecMode(),
perpetualIdToClobPairId,
)
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &orderbookFill,
},
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

LGTM: Refactored SendOrderbookFillUpdate for single-fill processing

The changes to this function align well with the improvements made to SendOrderbookUpdates:

  1. Processing a single fill at a time may lead to more efficient and granular updates.
  2. The split between optimistic and consensus update handling is consistent with the overall design.
  3. Using sdk.Context provides more flexibility and consistency with other functions.

Suggestion for improvement:

For consistency with the SendOrderbookUpdates function, consider extracting the logic for creating and staging the event into a separate helper function.

+func (sm *FullNodeStreamingManagerImpl) stageOrderbookFillUpdate(ctx sdk.Context, orderbookFill clobtypes.StreamOrderbookFill) {
+    stagedEvent := clobtypes.StagedFinalizeBlockEvent{
+        Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
+            OrderFill: &orderbookFill,
+        },
+    }
+    sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent))
+}

 // In the SendOrderbookFillUpdate function
 if lib.IsDeliverTxMode(ctx) {
-    stagedEvent := clobtypes.StagedFinalizeBlockEvent{
-        Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
-            OrderFill: &orderbookFill,
-        },
-    }
-
-    sm.stageFinalizeBlockEvent(
-        ctx,
-        sm.cdc.MustMarshal(&stagedEvent),
-    )
+    sm.stageOrderbookFillUpdate(ctx, orderbookFill)
 }

This change would improve code consistency and readability across similar functions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// SendOrderbookFillUpdate groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)
// If not `DeliverTx`, then updates are optimistic. Stream them directly.
if !lib.IsDeliverTxMode(ctx) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
[]clobtypes.StreamOrderbookFill{orderbookFill},
blockHeight,
ctx.ExecMode(),
perpetualIdToClobPairId,
)
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &orderbookFill,
},
}
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
sm.stageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
// SendOrderbookFillUpdate groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
orderbookFill clobtypes.StreamOrderbookFill,
blockHeight uint32,
ctx sdk.Context,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// If not `DeliverTx`, then updates are optimistic. Stream them directly.
if !lib.IsDeliverTxMode(ctx) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)
streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
[]clobtypes.StreamOrderbookFill{orderbookFill},
blockHeight,
ctx.ExecMode(),
perpetualIdToClobPairId,
)
sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return
}
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizese on a block.
sm.stageOrderbookFillUpdate(ctx, orderbookFill)
}
func (sm *FullNodeStreamingManagerImpl) stageOrderbookFillUpdate(ctx sdk.Context, orderbookFill clobtypes.StreamOrderbookFill) {
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &orderbookFill,
},
}
sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent))
}

Comment on lines +12 to +18
func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes.OffChainUpdateV1 {
v1updates := make([]ocutypes.OffChainUpdateV1, 0)
for _, message := range offchainUpdates.Messages {
var update ocutypes.OffChainUpdateV1
err := proto.Unmarshal(message.Message.Value, &update)
if err != nil {
return nil, err
panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Handle Errors Returned by GetOffchainUpdatesV1 in All Callers

The verification indicates that callers of GetOffchainUpdatesV1 in protocol/streaming/full_node_streaming_manager.go do not handle the returned error. Please update these locations to appropriately handle errors to ensure application robustness.

  • protocol/streaming/full_node_streaming_manager.go
🔗 Analysis chain

Avoid panicking in utility functions; return errors instead

Panicking within GetOffchainUpdatesV1 on unmarshalling failure can cause the entire application to crash unexpectedly. It's better practice to return an error and allow the caller to handle it appropriately, enhancing robustness and flexibility.

Apply this diff to restore error handling:

-func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes.OffChainUpdateV1 {
+func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) {
    v1updates := make([]ocutypes.OffChainUpdateV1, 0)
    for _, message := range offchainUpdates.Messages {
        var update ocutypes.OffChainUpdateV1
        err := proto.Unmarshal(message.Message.Value, &update)
        if err != nil {
-           panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err))
+           return nil, fmt.Errorf("failed to unmarshal OffChainUpdateV1: %v", err)
        }
        v1updates = append(v1updates, update)
    }
-   return v1updates
+   return v1updates, nil
}

Run the following script to verify that all callers of GetOffchainUpdatesV1 handle the updated error return:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all usages of GetOffchainUpdatesV1 and check for error handling.

# Search for function calls to GetOffchainUpdatesV1
rg --type go 'GetOffchainUpdatesV1\(' -A 2

Length of output: 1173

if err != nil {
panic(err)
}
for clobPairId, v1updates := range clobPairIdToV1Updates {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we iterate over maps deterministically here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 538 to 539
blockHeight uint32,
execMode sdk.ExecMode,
ctx sdk.Context,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might as well read block height from ctx? or are you worried about the +1 problem for check tx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this function and couple of others to only take in ctx

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (3)
protocol/streaming/full_node_streaming_manager.go (3)

Line range hint 392-412: LGTM: Improved SendSubaccountUpdate function

The changes to this function enhance its consistency and efficiency:

  1. Renaming to SendSubaccountUpdate aligns with other function names.
  2. Using sdk.Context as a parameter simplifies the function signature.
  3. Early return for non-DeliverTx mode prevents unnecessary processing.
  4. Staging events only in DeliverTx mode ensures only finalized updates are staged.

These changes improve the overall design and efficiency of the function.

Consider updating the comment to be more precise:

-// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
+// In `DeliverTx` mode, stage the update to be streamed after consensus finalizes on a block.

621-656: LGTM: Improved SendOrderbookFillUpdate function with suggestion

The changes to this function enhance its functionality and consistency:

  1. Renaming to SendOrderbookFillUpdate and handling a single fill makes the function more focused.
  2. Using sdk.Context as a parameter simplifies the function signature and provides more context.
  3. The differentiation between DeliverTx and non-DeliverTx modes allows for appropriate handling of updates:
    • In non-DeliverTx mode, updates are processed immediately.
    • In DeliverTx mode, updates are staged for later processing.

These changes align well with the modifications made to the SendOrderbookUpdates function, improving overall consistency.

For even better consistency with SendOrderbookUpdates, consider extracting the staging logic into a separate method:

+func (sm *FullNodeStreamingManagerImpl) stageOrderbookFillUpdate(ctx sdk.Context, orderbookFill clobtypes.StreamOrderbookFill) {
+    stagedEvent := clobtypes.StagedFinalizeBlockEvent{
+        Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
+            OrderFill: &orderbookFill,
+        },
+    }
+    sm.stageFinalizeBlockEvent(ctx, sm.cdc.MustMarshal(&stagedEvent))
+}

 // In the SendOrderbookFillUpdate function
 if lib.IsDeliverTxMode(ctx) {
-    stagedEvent := clobtypes.StagedFinalizeBlockEvent{
-        Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
-            OrderFill: &orderbookFill,
-        },
-    }
-
-    sm.stageFinalizeBlockEvent(
-        ctx,
-        sm.cdc.MustMarshal(&stagedEvent),
-    )
+    sm.stageOrderbookFillUpdate(ctx, orderbookFill)
 }

This change would improve code consistency and readability across similar functions.


Line range hint 926-1015: LGTM: Comprehensive improvements to StreamBatchUpdatesAfterFinalizeBlock

The changes to this function significantly enhance its functionality and organization:

  1. Addition of handling for finalized orderbook updates completes the set of update types processed.
  2. Use of helper functions (getStagedEventsFromFinalizeBlock, getStreamUpdatesFromOffchainUpdates, etc.) improves code readability and maintainability.
  3. Clear separation between different types of updates (sync local, fills, orderbook, subaccount) enhances the function's structure.
  4. The function now provides a comprehensive batch update after block finalization, ensuring all relevant updates are processed and cached.

These improvements contribute to a more robust and efficient streaming process after block finalization.

Consider adding error handling for the calls to helper functions. While they may not return errors currently, adding error checks would make the function more robust to future changes:

 // Cache updates to sync local ops queue
-sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
+sycnLocalUpdates, syncLocalClobPairIds, err := getStreamUpdatesFromOffchainUpdates(
     streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
     lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
     ctx.ExecMode(),
 )
+if err != nil {
+    sm.logger.Error("Failed to get stream updates for sync local ops queue", "error", err)
+    // Consider how to handle this error (e.g., skip these updates, return from the function, etc.)
+}
 sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)

Apply similar error handling to other helper function calls in this method.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between ee90970 and 1fb2b95.

📒 Files selected for processing (5)
  • protocol/streaming/full_node_streaming_manager.go (14 hunks)
  • protocol/streaming/noop_streaming_manager.go (2 hunks)
  • protocol/streaming/types/interface.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (2 hunks)
  • protocol/x/clob/keeper/process_operations.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • protocol/streaming/noop_streaming_manager.go
  • protocol/streaming/types/interface.go
  • protocol/x/clob/keeper/keeper.go
  • protocol/x/clob/keeper/process_operations.go
🔇 Additional comments (3)
protocol/streaming/full_node_streaming_manager.go (3)

23-24: LGTM: Import added for off-chain update types

The addition of the import for ocutypes is appropriate given the changes in the function signatures and implementations that now use off-chain update types directly.


547-581: LGTM: Enhanced SendOrderbookUpdates function

The changes to this function significantly improve its functionality and flexibility:

  1. Using sdk.Context as a parameter simplifies the function signature and provides more context.
  2. The differentiation between DeliverTx and non-DeliverTx modes allows for appropriate handling of updates:
    • In non-DeliverTx mode, updates are streamed directly, allowing for immediate processing of optimistic updates.
    • In DeliverTx mode, updates are staged to be streamed later, ensuring that finalized updates are processed after consensus.
  3. The use of lib.MustConvertIntegerToUint32(ctx.BlockHeight()) and ctx.ExecMode() ensures consistent handling of block height and execution mode.

These changes enhance the overall design and efficiency of the function, making it more robust and adaptable to different execution contexts.


Line range hint 881-914: LGTM: Well-implemented caching functions for stream updates

The addition of cacheStreamUpdatesByClobPairWithLock and cacheStreamUpdatesBySubaccountWithLock functions improves the organization and efficiency of update caching:

  1. Clear separation of concerns: Each function handles a specific type of update (clob pairs or subaccounts).
  2. Efficient implementation: The functions directly append updates to the cache and manage the corresponding mappings.
  3. Clear documentation: The comments explicitly state the assumption about lock acquisition, preventing potential misuse.

These functions contribute to a more modular and maintainable codebase.

Comment on lines 486 to 526
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
execMode sdk.ExecMode,
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
// unique list of clob pair Ids to send updates for.
clobPairIds = make([]uint32, 0)
for _, v1update := range v1updates {
var clobPairId uint32
switch u := v1update.UpdateMessage.(type) {
case *ocutypes.OffChainUpdateV1_OrderPlace:
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderReplace:
clobPairId = u.OrderReplace.OldOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderRemove:
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderUpdate:
clobPairId = u.OrderUpdate.OrderId.ClobPairId
default:
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)

if _, ok := clobPairIdToV1Updates[clobPairId]; !ok {
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{}
clobPairIds = append(clobPairIds, clobPairId)
}
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update)
}

// Unmarshal each per-clob pair message to v1 updates.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds))

for _, clobPairId := range clobPairIds {
v1updates, exists := clobPairIdToV1Updates[clobPairId]
if !exists {
panic(fmt.Sprintf(
"clob pair id %v not found in clobPairIdToV1Updates: %v",
clobPairId,
clobPairIdToV1Updates,
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

LGTM: Improved getStreamUpdatesFromOffchainUpdates function with suggestion

The changes to this function significantly improve its functionality and efficiency:

  1. Using []ocutypes.OffChainUpdateV1 allows for more direct and efficient processing of updates.
  2. Grouping updates by clob pair id and creating a unique list prevents duplicate processing.
  3. The switch statement ensures all update types are handled correctly.

These changes enhance the overall performance and robustness of the function.

However, the use of panic for unhandled update types might be too aggressive for production code. Consider logging an error and continuing processing instead:

 default:
-    panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
+    sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u))
+    continue

This change would make the function more resilient to unexpected update types without crashing the application.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
execMode sdk.ExecMode,
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
clobPairId := message.OrderId.ClobPairId
if _, ok := updates[clobPairId]; !ok {
updates[clobPairId] = clobtypes.NewOffchainUpdates()
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
// unique list of clob pair Ids to send updates for.
clobPairIds = make([]uint32, 0)
for _, v1update := range v1updates {
var clobPairId uint32
switch u := v1update.UpdateMessage.(type) {
case *ocutypes.OffChainUpdateV1_OrderPlace:
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderReplace:
clobPairId = u.OrderReplace.OldOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderRemove:
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderUpdate:
clobPairId = u.OrderUpdate.OrderId.ClobPairId
default:
panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u))
}
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok {
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{}
clobPairIds = append(clobPairIds, clobPairId)
}
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update)
}
// Unmarshal each per-clob pair message to v1 updates.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds))
for _, clobPairId := range clobPairIds {
v1updates, exists := clobPairIdToV1Updates[clobPairId]
if !exists {
panic(fmt.Sprintf(
"clob pair id %v not found in clobPairIdToV1Updates: %v",
clobPairId,
clobPairIdToV1Updates,
))
v1updates []ocutypes.OffChainUpdateV1,
blockHeight uint32,
execMode sdk.ExecMode,
) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
// unique list of clob pair Ids to send updates for.
clobPairIds = make([]uint32, 0)
for _, v1update := range v1updates {
var clobPairId uint32
switch u := v1update.UpdateMessage.(type) {
case *ocutypes.OffChainUpdateV1_OrderPlace:
clobPairId = u.OrderPlace.Order.OrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderReplace:
clobPairId = u.OrderReplace.OldOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderRemove:
clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId
case *ocutypes.OffChainUpdateV1_OrderUpdate:
clobPairId = u.OrderUpdate.OrderId.ClobPairId
default:
sm.logger.Error("Unhandled UpdateMessage type", "type", fmt.Sprintf("%T", u))
continue
}
if _, ok := clobPairIdToV1Updates[clobPairId]; !ok {
clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{}
clobPairIds = append(clobPairIds, clobPairId)
}
clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update)
}
// Unmarshal each per-clob pair message to v1 updates.
streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds))
for _, clobPairId := range clobPairIds {
v1updates, exists := clobPairIdToV1Updates[clobPairId]
if !exists {
panic(fmt.Sprintf(
"clob pair id %v not found in clobPairIdToV1Updates: %v",
clobPairId,
clobPairIdToV1Updates,
))

@teddyding teddyding merged commit d583dbc into main Sep 30, 2024
32 of 34 checks passed
@teddyding teddyding deleted the td/fix-fns-stage branch September 30, 2024 18:38
@teddyding teddyding changed the title Internalize logic to stage FinalizeBlock events bug: Internalize FNS logic to stage FinalizeBlock events Sep 30, 2024
@teddyding
Copy link
Contributor Author

@Mergifyio backport release/protocol/v7.x

Copy link
Contributor

mergify bot commented Sep 30, 2024

backport release/protocol/v7.x

✅ Backports have been created

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants