Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OTE-456] FNS x OE: stage FinalizeBlock events and emit in Precommit #2253

Merged
merged 5 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
494 changes: 248 additions & 246 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import * as _127 from "./gogo";
export const gogoproto = { ..._127
import * as _128 from "./gogo";
export const gogoproto = { ..._128
};
22 changes: 11 additions & 11 deletions indexer/packages/v4-protos/src/codegen/google/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import * as _128 from "./api/annotations";
import * as _129 from "./api/http";
import * as _130 from "./protobuf/descriptor";
import * as _131 from "./protobuf/duration";
import * as _132 from "./protobuf/timestamp";
import * as _133 from "./protobuf/any";
import * as _129 from "./api/annotations";
import * as _130 from "./api/http";
import * as _131 from "./protobuf/descriptor";
import * as _132 from "./protobuf/duration";
import * as _133 from "./protobuf/timestamp";
import * as _134 from "./protobuf/any";
export namespace google {
export const api = { ..._128,
..._129
export const api = { ..._129,
..._130
};
export const protobuf = { ..._130,
..._131,
export const protobuf = { ..._131,
..._132,
..._133
..._133,
..._134
};
}
16 changes: 16 additions & 0 deletions proto/dydxprotocol/clob/streaming.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";
package dydxprotocol.clob;

import "dydxprotocol/subaccounts/streaming.proto";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the missing import file.

The import statement import "dydxprotocol/subaccounts/streaming.proto"; references a file that does not exist, according to the static analysis hints. This could lead to compilation errors.

Please ensure that the file dydxprotocol/subaccounts/streaming.proto exists in the specified location. If you need assistance resolving this issue, I'd be happy to help. Let me know if you'd like me to open a GitHub issue to track this task.

Tools
buf

4-4: import "dydxprotocol/subaccounts/streaming.proto": file does not exist

(COMPILE)

import "dydxprotocol/clob/query.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`.
message StagedFinalizeBlockEvent {
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
}
}
4 changes: 4 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ func New(
statsmoduletypes.TransientStoreKey,
rewardsmoduletypes.TransientStoreKey,
indexer_manager.TransientStoreKey,
streaming.StreamingManagerTransientStoreKey,
perpetualsmoduletypes.TransientStoreKey,
)
memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey)
Expand Down Expand Up @@ -764,6 +765,7 @@ func New(
appFlags,
appCodec,
logger,
tkeys[streaming.StreamingManagerTransientStoreKey],
)

timeProvider := &timelib.TimeProviderImpl{}
Expand Down Expand Up @@ -2059,6 +2061,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags flags.Flags,
cdc codec.Codec,
logger log.Logger,
streamingManagerTransientStoreKey storetypes.StoreKey,
) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) {
logger = logger.With(log.ModuleKey, "full-node-streaming")
if appFlags.GrpcStreamingEnabled {
Expand All @@ -2072,6 +2075,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
streamingManagerTransientStoreKey,
)

// Start websocket server.
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ const (
UpdateType = "update_type"
ValidateMatches = "validate_matches"
ValidateOrder = "validate_order"
StreamBatchUpdatesAfterFinalizeBlock = "stream_batch_updates_after_finalize_block"

// MemCLOB.
AddedToOrderBook = "added_to_orderbook"
Expand Down
29 changes: 16 additions & 13 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,22 @@ const (
GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency"

// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
13 changes: 13 additions & 0 deletions protocol/streaming/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package streaming

// Constants for FullNodeStreamingManager.
const (
teddyding marked this conversation as resolved.
Show resolved Hide resolved
// Transient store key for storing staged events.
StreamingManagerTransientStoreKey = "tmp_streaming"

// Key for storing the count of staged events.
StagedEventsCountKey = "EvtCnt"

// Key prefix for staged events.
StagedEventsKeyPrefix = "Evt:"
)
167 changes: 167 additions & 0 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package streaming

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/dydxprotocol/v4-chain/protocol/lib"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
Expand Down Expand Up @@ -50,6 +55,9 @@ type FullNodeStreamingManagerImpl struct {
// Block interval in which snapshot info should be sent out in.
// Defaults to 0, which means only one snapshot will be sent out.
snapshotBlockInterval uint32

// stores the staged FinalizeBlock events for full node streaming.
streamingManagerTransientStoreKey storetypes.StoreKey
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -86,6 +94,7 @@ func NewFullNodeStreamingManager(
maxUpdatesInCache uint32,
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
streamingManagerTransientStoreKey storetypes.StoreKey,
) *FullNodeStreamingManagerImpl {
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
Expand All @@ -102,6 +111,8 @@ func NewFullNodeStreamingManager(
maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
snapshotBlockInterval: snapshotBlockInterval,

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -367,6 +378,88 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
}
}

func getStagedEventsCount(store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(StagedEventsCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// Stage a subaccount update event in transient store, during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
)
}

// Stage a fill event in transient store, during `FinalizeBlock`.
// Since `FinalizeBlock` code block can be called more than once with optimistc
// execution (once optimistcally and optionally once on the canonical block),
// we need to stage the events in transient store and later emit them
// during `Precommit`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &fill,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
)
}

func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
clobtypes.Amino.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEvents(store)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding why do we need to add gas meter to ctx? because we write to state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes any read/write to store incur the gas meter. We do the same thing for indexer

store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)

// Increment events count.
count := getStagedEventsCount(store)
store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
}

// SendCombinedSnapshot sends messages to a particular subscriber without buffering.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
Expand Down Expand Up @@ -703,6 +796,80 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
return ret
}

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
ctx sdk.Context,
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdates()

Copy link
Contributor

@jonfung-dydx jonfung-dydx Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud, we don't need to sm.Lock() this right? CheckTx transactions won't run in parallel when we are finalizing the block so the cache couldn't possibly get new updates when we are calling this function? It could be the case that we flush on the timer but it should be fine if cache is empty since we flush it first

EDIT: i see that the three sm.SendOrderbookUpdates() functions already obtain the lock before adding to the cache, i think it's fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed. In the follow-up PR I will obtain the lock around sending a batch

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

// TODO(CT-1190): Stream below in a single batch.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in a follow-up PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably good to just obtain the lock and manually send the updates through in a single batch bypassing the cache

// Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock.
sm.SendOrderbookUpdates(
orderBookUpdatesToSyncLocalOpsQueue,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

// Send finalized fills from FinalizeBlock.
sm.SendOrderbookFillUpdates(
finalizedFills,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)

// Send finalized subaccount updates from FinalizeBlock.
sm.SendFinalizedSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}

// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
// It should be called after the consensus agrees on a block (e.g. Precommitter).
func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
ctx sdk.Context,
) (
finalizedFills []clobtypes.StreamOrderbookFill,
finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate,
) {
// Get onchain stream events stored in transient store.
stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx)

metrics.SetGauge(
metrics.GrpcStagedAllFinalizeBlockUpdatesCount,
float32(len(stagedEvents)),
)

for _, stagedEvent := range stagedEvents {
switch event := stagedEvent.Event.(type) {
case *clobtypes.StagedFinalizeBlockEvent_OrderFill:
finalizedFills = append(finalizedFills, *event.OrderFill)
case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate:
finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate)
}
}

metrics.SetGauge(
metrics.GrpcStagedSubaccountFinalizeBlockUpdatesCount,
float32(len(finalizedSubaccountUpdates)),
)
metrics.SetGauge(
metrics.GrpcStagedFillFinalizeBlockUpdatesCount,
float32(len(finalizedFills)),
)

return finalizedFills, finalizedSubaccountUpdates
}

func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
Expand Down
25 changes: 25 additions & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,28 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams(

func (sm *NoopGrpcStreamingManager) Stop() {
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
}

func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
return nil
}

func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
}

func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock(
ctx sdk.Context,
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
}
Loading
Loading