Skip to content

Commit

Permalink
[OTE-456] FNS x OE: stage FinalizeBlock events and emit in `Precomm…
Browse files Browse the repository at this point in the history
…it` (#2253)
  • Loading branch information
teddyding authored and jonfung-dydx committed Sep 26, 2024
1 parent 4fe6c7d commit dddf6f8
Show file tree
Hide file tree
Showing 17 changed files with 1,118 additions and 290 deletions.
466 changes: 234 additions & 232 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { StreamOrderbookFill, StreamOrderbookFillSDKType } from "./query";
import { StreamSubaccountUpdate, StreamSubaccountUpdateSDKType } from "../subaccounts/streaming";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial } from "../../helpers";
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEvent {
orderFill?: StreamOrderbookFill;
subaccountUpdate?: StreamSubaccountUpdate;
}
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEventSDKType {
order_fill?: StreamOrderbookFillSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
}

function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent {
return {
orderFill: undefined,
subaccountUpdate: undefined
};
}

export const StagedFinalizeBlockEvent = {
encode(message: StagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.orderFill !== undefined) {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(10).fork()).ldelim();
}

if (message.subaccountUpdate !== undefined) {
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim();
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StagedFinalizeBlockEvent {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStagedFinalizeBlockEvent();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
break;

case 2:
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StagedFinalizeBlockEvent>): StagedFinalizeBlockEvent {
const message = createBaseStagedFinalizeBlockEvent();
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined;
return message;
}

};
4 changes: 2 additions & 2 deletions indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import * as _117 from "./gogo";
export const gogoproto = { ..._117
import * as _118 from "./gogo";
export const gogoproto = { ..._118
};
22 changes: 11 additions & 11 deletions indexer/packages/v4-protos/src/codegen/google/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import * as _118 from "./api/annotations";
import * as _119 from "./api/http";
import * as _120 from "./protobuf/descriptor";
import * as _121 from "./protobuf/duration";
import * as _122 from "./protobuf/timestamp";
import * as _123 from "./protobuf/any";
import * as _119 from "./api/annotations";
import * as _120 from "./api/http";
import * as _121 from "./protobuf/descriptor";
import * as _122 from "./protobuf/duration";
import * as _123 from "./protobuf/timestamp";
import * as _124 from "./protobuf/any";
export namespace google {
export const api = { ..._118,
..._119
export const api = { ..._119,
..._120
};
export const protobuf = { ..._120,
..._121,
export const protobuf = { ..._121,
..._122,
..._123
..._123,
..._124
};
}
16 changes: 16 additions & 0 deletions proto/dydxprotocol/clob/streaming.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";
package dydxprotocol.clob;

import "dydxprotocol/subaccounts/streaming.proto";
import "dydxprotocol/clob/query.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`.
message StagedFinalizeBlockEvent {
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
}
}
4 changes: 4 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func New(
statsmoduletypes.TransientStoreKey,
rewardsmoduletypes.TransientStoreKey,
indexer_manager.TransientStoreKey,
streaming.StreamingManagerTransientStoreKey,
perpetualsmoduletypes.TransientStoreKey,
)
memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey)
Expand Down Expand Up @@ -762,6 +763,7 @@ func New(
appFlags,
appCodec,
logger,
tkeys[streaming.StreamingManagerTransientStoreKey],
)

timeProvider := &timelib.TimeProviderImpl{}
Expand Down Expand Up @@ -2029,6 +2031,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags flags.Flags,
cdc codec.Codec,
logger log.Logger,
streamingManagerTransientStoreKey storetypes.StoreKey,
) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) {
logger = logger.With(log.ModuleKey, "full-node-streaming")
if appFlags.GrpcStreamingEnabled {
Expand All @@ -2042,6 +2045,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
streamingManagerTransientStoreKey,
)

// Start websocket server.
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ const (
UpdateType = "update_type"
ValidateMatches = "validate_matches"
ValidateOrder = "validate_order"
StreamBatchUpdatesAfterFinalizeBlock = "stream_batch_updates_after_finalize_block"

// MemCLOB.
AddedToOrderBook = "added_to_orderbook"
Expand Down
29 changes: 16 additions & 13 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,22 @@ const (
GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency"

// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
13 changes: 13 additions & 0 deletions protocol/streaming/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package streaming

// Constants for FullNodeStreamingManager.
const (
// Transient store key for storing staged events.
StreamingManagerTransientStoreKey = "tmp_streaming"

// Key for storing the count of staged events.
StagedEventsCountKey = "EvtCnt"

// Key prefix for staged events.
StagedEventsKeyPrefix = "Evt:"
)
Loading

0 comments on commit dddf6f8

Please sign in to comment.