diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts index fe4846372f..135d26b777 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts @@ -28,142 +28,143 @@ import * as _31 from "./clob/order_removals"; import * as _32 from "./clob/order"; import * as _33 from "./clob/process_proposer_matches_events"; import * as _34 from "./clob/query"; -import * as _35 from "./clob/tx"; -import * as _36 from "./daemons/bridge/bridge"; -import * as _37 from "./daemons/liquidation/liquidation"; -import * as _38 from "./daemons/pricefeed/price_feed"; -import * as _39 from "./delaymsg/block_message_ids"; -import * as _40 from "./delaymsg/delayed_message"; -import * as _41 from "./delaymsg/genesis"; -import * as _42 from "./delaymsg/query"; -import * as _43 from "./delaymsg/tx"; -import * as _44 from "./epochs/epoch_info"; -import * as _45 from "./epochs/genesis"; -import * as _46 from "./epochs/query"; -import * as _47 from "./feetiers/genesis"; -import * as _48 from "./feetiers/params"; -import * as _49 from "./feetiers/query"; -import * as _50 from "./feetiers/tx"; -import * as _51 from "./govplus/genesis"; -import * as _52 from "./govplus/query"; -import * as _53 from "./govplus/tx"; -import * as _54 from "./indexer/events/events"; -import * as _55 from "./indexer/indexer_manager/event"; -import * as _56 from "./indexer/off_chain_updates/off_chain_updates"; -import * as _57 from "./indexer/protocol/v1/clob"; -import * as _58 from "./indexer/protocol/v1/perpetual"; -import * as _59 from "./indexer/protocol/v1/subaccount"; -import * as _60 from "./indexer/redis/redis_order"; -import * as _61 from "./indexer/shared/removal_reason"; -import * as _62 from "./indexer/socks/messages"; -import * as _63 from "./listing/genesis"; -import * as _64 from "./listing/query"; -import * as _65 from "./listing/tx"; -import * as _66 from "./perpetuals/genesis"; -import * as _67 from "./perpetuals/params"; -import * as _68 from "./perpetuals/perpetual"; -import * as _69 from "./perpetuals/query"; -import * as _70 from "./perpetuals/tx"; -import * as _71 from "./prices/genesis"; -import * as _72 from "./prices/market_param"; -import * as _73 from "./prices/market_price"; -import * as _74 from "./prices/query"; -import * as _75 from "./prices/tx"; -import * as _76 from "./ratelimit/capacity"; -import * as _77 from "./ratelimit/genesis"; -import * as _78 from "./ratelimit/limit_params"; -import * as _79 from "./ratelimit/pending_send_packet"; -import * as _80 from "./ratelimit/query"; -import * as _81 from "./ratelimit/tx"; -import * as _82 from "./revshare/genesis"; -import * as _83 from "./revshare/params"; -import * as _84 from "./revshare/query"; -import * as _85 from "./revshare/revshare"; -import * as _86 from "./revshare/tx"; -import * as _87 from "./rewards/genesis"; -import * as _88 from "./rewards/params"; -import * as _89 from "./rewards/query"; -import * as _90 from "./rewards/reward_share"; -import * as _91 from "./rewards/tx"; -import * as _92 from "./sending/genesis"; -import * as _93 from "./sending/query"; -import * as _94 from "./sending/transfer"; -import * as _95 from "./sending/tx"; -import * as _96 from "./stats/genesis"; -import * as _97 from "./stats/params"; -import * as _98 from "./stats/query"; -import * as _99 from "./stats/stats"; -import * as _100 from "./stats/tx"; -import * as _101 from "./subaccounts/asset_position"; -import * as _102 from "./subaccounts/genesis"; -import * as _103 from "./subaccounts/perpetual_position"; -import * as _104 from "./subaccounts/query"; -import * as _105 from "./subaccounts/streaming"; -import * as _106 from "./subaccounts/subaccount"; -import * as _107 from "./vault/genesis"; -import * as _108 from "./vault/params"; -import * as _109 from "./vault/query"; -import * as _110 from "./vault/share"; -import * as _111 from "./vault/tx"; -import * as _112 from "./vault/vault"; -import * as _113 from "./vest/genesis"; -import * as _114 from "./vest/query"; -import * as _115 from "./vest/tx"; -import * as _116 from "./vest/vest_entry"; -import * as _124 from "./assets/query.lcd"; -import * as _125 from "./blocktime/query.lcd"; -import * as _126 from "./bridge/query.lcd"; -import * as _127 from "./clob/query.lcd"; -import * as _128 from "./delaymsg/query.lcd"; -import * as _129 from "./epochs/query.lcd"; -import * as _130 from "./feetiers/query.lcd"; -import * as _131 from "./perpetuals/query.lcd"; -import * as _132 from "./prices/query.lcd"; -import * as _133 from "./ratelimit/query.lcd"; -import * as _134 from "./revshare/query.lcd"; -import * as _135 from "./rewards/query.lcd"; -import * as _136 from "./stats/query.lcd"; -import * as _137 from "./subaccounts/query.lcd"; -import * as _138 from "./vault/query.lcd"; -import * as _139 from "./vest/query.lcd"; -import * as _140 from "./assets/query.rpc.Query"; -import * as _141 from "./blocktime/query.rpc.Query"; -import * as _142 from "./bridge/query.rpc.Query"; -import * as _143 from "./clob/query.rpc.Query"; -import * as _144 from "./delaymsg/query.rpc.Query"; -import * as _145 from "./epochs/query.rpc.Query"; -import * as _146 from "./feetiers/query.rpc.Query"; -import * as _147 from "./govplus/query.rpc.Query"; -import * as _148 from "./listing/query.rpc.Query"; -import * as _149 from "./perpetuals/query.rpc.Query"; -import * as _150 from "./prices/query.rpc.Query"; -import * as _151 from "./ratelimit/query.rpc.Query"; -import * as _152 from "./revshare/query.rpc.Query"; -import * as _153 from "./rewards/query.rpc.Query"; -import * as _154 from "./sending/query.rpc.Query"; -import * as _155 from "./stats/query.rpc.Query"; -import * as _156 from "./subaccounts/query.rpc.Query"; -import * as _157 from "./vault/query.rpc.Query"; -import * as _158 from "./vest/query.rpc.Query"; -import * as _159 from "./blocktime/tx.rpc.msg"; -import * as _160 from "./bridge/tx.rpc.msg"; -import * as _161 from "./clob/tx.rpc.msg"; -import * as _162 from "./delaymsg/tx.rpc.msg"; -import * as _163 from "./feetiers/tx.rpc.msg"; -import * as _164 from "./govplus/tx.rpc.msg"; -import * as _165 from "./listing/tx.rpc.msg"; -import * as _166 from "./perpetuals/tx.rpc.msg"; -import * as _167 from "./prices/tx.rpc.msg"; -import * as _168 from "./ratelimit/tx.rpc.msg"; -import * as _169 from "./revshare/tx.rpc.msg"; -import * as _170 from "./rewards/tx.rpc.msg"; -import * as _171 from "./sending/tx.rpc.msg"; -import * as _172 from "./stats/tx.rpc.msg"; -import * as _173 from "./vault/tx.rpc.msg"; -import * as _174 from "./vest/tx.rpc.msg"; -import * as _175 from "./lcd"; -import * as _176 from "./rpc.query"; -import * as _177 from "./rpc.tx"; +import * as _35 from "./clob/streaming"; +import * as _36 from "./clob/tx"; +import * as _37 from "./daemons/bridge/bridge"; +import * as _38 from "./daemons/liquidation/liquidation"; +import * as _39 from "./daemons/pricefeed/price_feed"; +import * as _40 from "./delaymsg/block_message_ids"; +import * as _41 from "./delaymsg/delayed_message"; +import * as _42 from "./delaymsg/genesis"; +import * as _43 from "./delaymsg/query"; +import * as _44 from "./delaymsg/tx"; +import * as _45 from "./epochs/epoch_info"; +import * as _46 from "./epochs/genesis"; +import * as _47 from "./epochs/query"; +import * as _48 from "./feetiers/genesis"; +import * as _49 from "./feetiers/params"; +import * as _50 from "./feetiers/query"; +import * as _51 from "./feetiers/tx"; +import * as _52 from "./govplus/genesis"; +import * as _53 from "./govplus/query"; +import * as _54 from "./govplus/tx"; +import * as _55 from "./indexer/events/events"; +import * as _56 from "./indexer/indexer_manager/event"; +import * as _57 from "./indexer/off_chain_updates/off_chain_updates"; +import * as _58 from "./indexer/protocol/v1/clob"; +import * as _59 from "./indexer/protocol/v1/perpetual"; +import * as _60 from "./indexer/protocol/v1/subaccount"; +import * as _61 from "./indexer/redis/redis_order"; +import * as _62 from "./indexer/shared/removal_reason"; +import * as _63 from "./indexer/socks/messages"; +import * as _64 from "./listing/genesis"; +import * as _65 from "./listing/query"; +import * as _66 from "./listing/tx"; +import * as _67 from "./perpetuals/genesis"; +import * as _68 from "./perpetuals/params"; +import * as _69 from "./perpetuals/perpetual"; +import * as _70 from "./perpetuals/query"; +import * as _71 from "./perpetuals/tx"; +import * as _72 from "./prices/genesis"; +import * as _73 from "./prices/market_param"; +import * as _74 from "./prices/market_price"; +import * as _75 from "./prices/query"; +import * as _76 from "./prices/tx"; +import * as _77 from "./ratelimit/capacity"; +import * as _78 from "./ratelimit/genesis"; +import * as _79 from "./ratelimit/limit_params"; +import * as _80 from "./ratelimit/pending_send_packet"; +import * as _81 from "./ratelimit/query"; +import * as _82 from "./ratelimit/tx"; +import * as _83 from "./revshare/genesis"; +import * as _84 from "./revshare/params"; +import * as _85 from "./revshare/query"; +import * as _86 from "./revshare/revshare"; +import * as _87 from "./revshare/tx"; +import * as _88 from "./rewards/genesis"; +import * as _89 from "./rewards/params"; +import * as _90 from "./rewards/query"; +import * as _91 from "./rewards/reward_share"; +import * as _92 from "./rewards/tx"; +import * as _93 from "./sending/genesis"; +import * as _94 from "./sending/query"; +import * as _95 from "./sending/transfer"; +import * as _96 from "./sending/tx"; +import * as _97 from "./stats/genesis"; +import * as _98 from "./stats/params"; +import * as _99 from "./stats/query"; +import * as _100 from "./stats/stats"; +import * as _101 from "./stats/tx"; +import * as _102 from "./subaccounts/asset_position"; +import * as _103 from "./subaccounts/genesis"; +import * as _104 from "./subaccounts/perpetual_position"; +import * as _105 from "./subaccounts/query"; +import * as _106 from "./subaccounts/streaming"; +import * as _107 from "./subaccounts/subaccount"; +import * as _108 from "./vault/genesis"; +import * as _109 from "./vault/params"; +import * as _110 from "./vault/query"; +import * as _111 from "./vault/share"; +import * as _112 from "./vault/tx"; +import * as _113 from "./vault/vault"; +import * as _114 from "./vest/genesis"; +import * as _115 from "./vest/query"; +import * as _116 from "./vest/tx"; +import * as _117 from "./vest/vest_entry"; +import * as _125 from "./assets/query.lcd"; +import * as _126 from "./blocktime/query.lcd"; +import * as _127 from "./bridge/query.lcd"; +import * as _128 from "./clob/query.lcd"; +import * as _129 from "./delaymsg/query.lcd"; +import * as _130 from "./epochs/query.lcd"; +import * as _131 from "./feetiers/query.lcd"; +import * as _132 from "./perpetuals/query.lcd"; +import * as _133 from "./prices/query.lcd"; +import * as _134 from "./ratelimit/query.lcd"; +import * as _135 from "./revshare/query.lcd"; +import * as _136 from "./rewards/query.lcd"; +import * as _137 from "./stats/query.lcd"; +import * as _138 from "./subaccounts/query.lcd"; +import * as _139 from "./vault/query.lcd"; +import * as _140 from "./vest/query.lcd"; +import * as _141 from "./assets/query.rpc.Query"; +import * as _142 from "./blocktime/query.rpc.Query"; +import * as _143 from "./bridge/query.rpc.Query"; +import * as _144 from "./clob/query.rpc.Query"; +import * as _145 from "./delaymsg/query.rpc.Query"; +import * as _146 from "./epochs/query.rpc.Query"; +import * as _147 from "./feetiers/query.rpc.Query"; +import * as _148 from "./govplus/query.rpc.Query"; +import * as _149 from "./listing/query.rpc.Query"; +import * as _150 from "./perpetuals/query.rpc.Query"; +import * as _151 from "./prices/query.rpc.Query"; +import * as _152 from "./ratelimit/query.rpc.Query"; +import * as _153 from "./revshare/query.rpc.Query"; +import * as _154 from "./rewards/query.rpc.Query"; +import * as _155 from "./sending/query.rpc.Query"; +import * as _156 from "./stats/query.rpc.Query"; +import * as _157 from "./subaccounts/query.rpc.Query"; +import * as _158 from "./vault/query.rpc.Query"; +import * as _159 from "./vest/query.rpc.Query"; +import * as _160 from "./blocktime/tx.rpc.msg"; +import * as _161 from "./bridge/tx.rpc.msg"; +import * as _162 from "./clob/tx.rpc.msg"; +import * as _163 from "./delaymsg/tx.rpc.msg"; +import * as _164 from "./feetiers/tx.rpc.msg"; +import * as _165 from "./govplus/tx.rpc.msg"; +import * as _166 from "./listing/tx.rpc.msg"; +import * as _167 from "./perpetuals/tx.rpc.msg"; +import * as _168 from "./prices/tx.rpc.msg"; +import * as _169 from "./ratelimit/tx.rpc.msg"; +import * as _170 from "./revshare/tx.rpc.msg"; +import * as _171 from "./rewards/tx.rpc.msg"; +import * as _172 from "./sending/tx.rpc.msg"; +import * as _173 from "./stats/tx.rpc.msg"; +import * as _174 from "./vault/tx.rpc.msg"; +import * as _175 from "./vest/tx.rpc.msg"; +import * as _176 from "./lcd"; +import * as _177 from "./rpc.query"; +import * as _178 from "./rpc.tx"; export namespace dydxprotocol { export const accountplus = { ..._5, ..._6 @@ -172,17 +173,17 @@ export namespace dydxprotocol { ..._8, ..._9, ..._10, - ..._124, - ..._140 + ..._125, + ..._141 }; export const blocktime = { ..._11, ..._12, ..._13, ..._14, ..._15, - ..._125, - ..._141, - ..._159 + ..._126, + ..._142, + ..._160 }; export const bridge = { ..._16, ..._17, @@ -190,9 +191,9 @@ export namespace dydxprotocol { ..._19, ..._20, ..._21, - ..._126, - ..._142, - ..._160 + ..._127, + ..._143, + ..._161 }; export const clob = { ..._22, ..._23, @@ -208,164 +209,165 @@ export namespace dydxprotocol { ..._33, ..._34, ..._35, - ..._127, - ..._143, - ..._161 + ..._36, + ..._128, + ..._144, + ..._162 }; export namespace daemons { - export const bridge = { ..._36 + export const bridge = { ..._37 }; - export const liquidation = { ..._37 + export const liquidation = { ..._38 }; - export const pricefeed = { ..._38 + export const pricefeed = { ..._39 }; } - export const delaymsg = { ..._39, - ..._40, + export const delaymsg = { ..._40, ..._41, ..._42, ..._43, - ..._128, - ..._144, - ..._162 + ..._44, + ..._129, + ..._145, + ..._163 }; - export const epochs = { ..._44, - ..._45, + export const epochs = { ..._45, ..._46, - ..._129, - ..._145 + ..._47, + ..._130, + ..._146 }; - export const feetiers = { ..._47, - ..._48, + export const feetiers = { ..._48, ..._49, ..._50, - ..._130, - ..._146, - ..._163 - }; - export const govplus = { ..._51, - ..._52, - ..._53, + ..._51, + ..._131, ..._147, ..._164 }; + export const govplus = { ..._52, + ..._53, + ..._54, + ..._148, + ..._165 + }; export namespace indexer { - export const events = { ..._54 + export const events = { ..._55 }; - export const indexer_manager = { ..._55 + export const indexer_manager = { ..._56 }; - export const off_chain_updates = { ..._56 + export const off_chain_updates = { ..._57 }; export namespace protocol { - export const v1 = { ..._57, - ..._58, - ..._59 + export const v1 = { ..._58, + ..._59, + ..._60 }; } - export const redis = { ..._60 + export const redis = { ..._61 }; - export const shared = { ..._61 + export const shared = { ..._62 }; - export const socks = { ..._62 + export const socks = { ..._63 }; } - export const listing = { ..._63, - ..._64, + export const listing = { ..._64, ..._65, - ..._148, - ..._165 + ..._66, + ..._149, + ..._166 }; - export const perpetuals = { ..._66, - ..._67, + export const perpetuals = { ..._67, ..._68, ..._69, ..._70, - ..._131, - ..._149, - ..._166 + ..._71, + ..._132, + ..._150, + ..._167 }; - export const prices = { ..._71, - ..._72, + export const prices = { ..._72, ..._73, ..._74, ..._75, - ..._132, - ..._150, - ..._167 + ..._76, + ..._133, + ..._151, + ..._168 }; - export const ratelimit = { ..._76, - ..._77, + export const ratelimit = { ..._77, ..._78, ..._79, ..._80, ..._81, - ..._133, - ..._151, - ..._168 + ..._82, + ..._134, + ..._152, + ..._169 }; - export const revshare = { ..._82, - ..._83, + export const revshare = { ..._83, ..._84, ..._85, ..._86, - ..._134, - ..._152, - ..._169 + ..._87, + ..._135, + ..._153, + ..._170 }; - export const rewards = { ..._87, - ..._88, + export const rewards = { ..._88, ..._89, ..._90, ..._91, - ..._135, - ..._153, - ..._170 + ..._92, + ..._136, + ..._154, + ..._171 }; - export const sending = { ..._92, - ..._93, + export const sending = { ..._93, ..._94, ..._95, - ..._154, - ..._171 + ..._96, + ..._155, + ..._172 }; - export const stats = { ..._96, - ..._97, + export const stats = { ..._97, ..._98, ..._99, ..._100, - ..._136, - ..._155, - ..._172 + ..._101, + ..._137, + ..._156, + ..._173 }; - export const subaccounts = { ..._101, - ..._102, + export const subaccounts = { ..._102, ..._103, ..._104, ..._105, ..._106, - ..._137, - ..._156 + ..._107, + ..._138, + ..._157 }; - export const vault = { ..._107, - ..._108, + export const vault = { ..._108, ..._109, ..._110, ..._111, ..._112, - ..._138, - ..._157, - ..._173 - }; - export const vest = { ..._113, - ..._114, - ..._115, - ..._116, + ..._113, ..._139, ..._158, ..._174 }; - export const ClientFactory = { ..._175, - ..._176, - ..._177 + export const vest = { ..._114, + ..._115, + ..._116, + ..._117, + ..._140, + ..._159, + ..._175 + }; + export const ClientFactory = { ..._176, + ..._177, + ..._178 }; } \ No newline at end of file diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts new file mode 100644 index 0000000000..dab8f1e122 --- /dev/null +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts @@ -0,0 +1,71 @@ +import { StreamOrderbookFill, StreamOrderbookFillSDKType } from "./query"; +import { StreamSubaccountUpdate, StreamSubaccountUpdateSDKType } from "../subaccounts/streaming"; +import * as _m0 from "protobufjs/minimal"; +import { DeepPartial } from "../../helpers"; +/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */ + +export interface StagedFinalizeBlockEvent { + orderFill?: StreamOrderbookFill; + subaccountUpdate?: StreamSubaccountUpdate; +} +/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */ + +export interface StagedFinalizeBlockEventSDKType { + order_fill?: StreamOrderbookFillSDKType; + subaccount_update?: StreamSubaccountUpdateSDKType; +} + +function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent { + return { + orderFill: undefined, + subaccountUpdate: undefined + }; +} + +export const StagedFinalizeBlockEvent = { + encode(message: StagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.orderFill !== undefined) { + StreamOrderbookFill.encode(message.orderFill, writer.uint32(10).fork()).ldelim(); + } + + if (message.subaccountUpdate !== undefined) { + StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim(); + } + + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): StagedFinalizeBlockEvent { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseStagedFinalizeBlockEvent(); + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 1: + message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32()); + break; + + case 2: + message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32()); + break; + + default: + reader.skipType(tag & 7); + break; + } + } + + return message; + }, + + fromPartial(object: DeepPartial): StagedFinalizeBlockEvent { + const message = createBaseStagedFinalizeBlockEvent(); + message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined; + message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined; + return message; + } + +}; \ No newline at end of file diff --git a/indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts b/indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts index cdc090d0d6..318577daaa 100644 --- a/indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts +++ b/indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts @@ -1,3 +1,3 @@ -import * as _117 from "./gogo"; -export const gogoproto = { ..._117 +import * as _118 from "./gogo"; +export const gogoproto = { ..._118 }; \ No newline at end of file diff --git a/indexer/packages/v4-protos/src/codegen/google/bundle.ts b/indexer/packages/v4-protos/src/codegen/google/bundle.ts index 7103f2f637..75cf041bac 100644 --- a/indexer/packages/v4-protos/src/codegen/google/bundle.ts +++ b/indexer/packages/v4-protos/src/codegen/google/bundle.ts @@ -1,16 +1,16 @@ -import * as _118 from "./api/annotations"; -import * as _119 from "./api/http"; -import * as _120 from "./protobuf/descriptor"; -import * as _121 from "./protobuf/duration"; -import * as _122 from "./protobuf/timestamp"; -import * as _123 from "./protobuf/any"; +import * as _119 from "./api/annotations"; +import * as _120 from "./api/http"; +import * as _121 from "./protobuf/descriptor"; +import * as _122 from "./protobuf/duration"; +import * as _123 from "./protobuf/timestamp"; +import * as _124 from "./protobuf/any"; export namespace google { - export const api = { ..._118, - ..._119 + export const api = { ..._119, + ..._120 }; - export const protobuf = { ..._120, - ..._121, + export const protobuf = { ..._121, ..._122, - ..._123 + ..._123, + ..._124 }; } \ No newline at end of file diff --git a/proto/dydxprotocol/clob/streaming.proto b/proto/dydxprotocol/clob/streaming.proto new file mode 100644 index 0000000000..06c74ffbe1 --- /dev/null +++ b/proto/dydxprotocol/clob/streaming.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; +package dydxprotocol.clob; + +import "dydxprotocol/subaccounts/streaming.proto"; +import "dydxprotocol/clob/query.proto"; + +option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"; + +// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. +message StagedFinalizeBlockEvent { + // Contains one of StreamOrderbookFill, StreamSubaccountUpdate. + oneof event { + StreamOrderbookFill order_fill = 1; + dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2; + } +} diff --git a/protocol/app/app.go b/protocol/app/app.go index 35386767e8..67d12d2711 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -467,6 +467,7 @@ func New( statsmoduletypes.TransientStoreKey, rewardsmoduletypes.TransientStoreKey, indexer_manager.TransientStoreKey, + streaming.StreamingManagerTransientStoreKey, perpetualsmoduletypes.TransientStoreKey, ) memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey) @@ -762,6 +763,7 @@ func New( appFlags, appCodec, logger, + tkeys[streaming.StreamingManagerTransientStoreKey], ) timeProvider := &timelib.TimeProviderImpl{} @@ -2029,6 +2031,7 @@ func getFullNodeStreamingManagerFromOptions( appFlags flags.Flags, cdc codec.Codec, logger log.Logger, + streamingManagerTransientStoreKey storetypes.StoreKey, ) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) { logger = logger.With(log.ModuleKey, "full-node-streaming") if appFlags.GrpcStreamingEnabled { @@ -2042,6 +2045,7 @@ func getFullNodeStreamingManagerFromOptions( appFlags.GrpcStreamingMaxBatchSize, appFlags.GrpcStreamingMaxChannelBufferSize, appFlags.FullNodeStreamingSnapshotInterval, + streamingManagerTransientStoreKey, ) // Start websocket server. diff --git a/protocol/lib/metrics/constants.go b/protocol/lib/metrics/constants.go index 0ac766bf5b..a0f851eb58 100644 --- a/protocol/lib/metrics/constants.go +++ b/protocol/lib/metrics/constants.go @@ -202,6 +202,7 @@ const ( UpdateType = "update_type" ValidateMatches = "validate_matches" ValidateOrder = "validate_order" + StreamBatchUpdatesAfterFinalizeBlock = "stream_batch_updates_after_finalize_block" // MemCLOB. AddedToOrderBook = "added_to_orderbook" diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index e38890d8ac..ac8e8a17d1 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -66,19 +66,22 @@ const ( GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency" // Full node grpc - FullNodeGrpc = "full_node_grpc" - GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" - GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" - GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency" - GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" - GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency" - GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" - GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" - GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" - GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" - GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" - GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" - GrpcSubscriptionChannelLength = "grpc_subscription_channel_length" + FullNodeGrpc = "full_node_grpc" + GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" + GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" + GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency" + GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" + GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency" + GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" + GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" + GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" + GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" + GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" + GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" + GrpcSubscriptionChannelLength = "grpc_subscription_channel_length" + GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count" + GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count" + GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/streaming/constants.go b/protocol/streaming/constants.go new file mode 100644 index 0000000000..3e3a3a6676 --- /dev/null +++ b/protocol/streaming/constants.go @@ -0,0 +1,13 @@ +package streaming + +// Constants for FullNodeStreamingManager. +const ( + // Transient store key for storing staged events. + StreamingManagerTransientStoreKey = "tmp_streaming" + + // Key for storing the count of staged events. + StagedEventsCountKey = "EvtCnt" + + // Key prefix for staged events. + StagedEventsKeyPrefix = "Evt:" +) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index a6fe37c512..34b7eaaf36 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -1,15 +1,20 @@ package streaming import ( + "encoding/binary" "fmt" "sync" "sync/atomic" "time" + "github.com/dydxprotocol/v4-chain/protocol/lib" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "cosmossdk.io/log" + "cosmossdk.io/store/prefix" + storetypes "cosmossdk.io/store/types" sdk "github.com/cosmos/cosmos-sdk/types" + ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "github.com/dydxprotocol/v4-chain/protocol/streaming/types" streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" @@ -50,6 +55,9 @@ type FullNodeStreamingManagerImpl struct { // Block interval in which snapshot info should be sent out in. // Defaults to 0, which means only one snapshot will be sent out. snapshotBlockInterval uint32 + + // stores the staged FinalizeBlock events for full node streaming. + streamingManagerTransientStoreKey storetypes.StoreKey } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -86,6 +94,7 @@ func NewFullNodeStreamingManager( maxUpdatesInCache uint32, maxSubscriptionChannelSize uint32, snapshotBlockInterval uint32, + streamingManagerTransientStoreKey storetypes.StoreKey, ) *FullNodeStreamingManagerImpl { fullNodeStreamingManager := &FullNodeStreamingManagerImpl{ logger: logger, @@ -102,6 +111,8 @@ func NewFullNodeStreamingManager( maxUpdatesInCache: maxUpdatesInCache, maxSubscriptionChannelSize: maxSubscriptionChannelSize, snapshotBlockInterval: snapshotBlockInterval, + + streamingManagerTransientStoreKey: streamingManagerTransientStoreKey, } // Start the goroutine for pushing order updates through. @@ -367,6 +378,88 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( } } +func getStagedEventsCount(store storetypes.KVStore) uint32 { + countsBytes := store.Get([]byte(StagedEventsCountKey)) + if countsBytes == nil { + return 0 + } + return binary.BigEndian.Uint32(countsBytes) +} + +// Stage a subaccount update event in transient store, during `FinalizeBlock`. +func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate( + ctx sdk.Context, + subaccountUpdate satypes.StreamSubaccountUpdate, +) { + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ + SubaccountUpdate: &subaccountUpdate, + }, + } + sm.stageFinalizeBlockEvent( + ctx, + clobtypes.Amino.MustMarshal(stagedEvent), + ) +} + +// Stage a fill event in transient store, during `FinalizeBlock`. +// Since `FinalizeBlock` code block can be called more than once with optimistic +// execution (once optimistically and optionally once on the canonical block), +// we need to stage the events in transient store and later emit them +// during `Precommit`. +func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill( + ctx sdk.Context, + fill clobtypes.StreamOrderbookFill, +) { + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ + OrderFill: &fill, + }, + } + sm.stageFinalizeBlockEvent( + ctx, + clobtypes.Amino.MustMarshal(stagedEvent), + ) +} + +func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent { + count := getStagedEventsCount(store) + events := make([]clobtypes.StagedFinalizeBlockEvent, count) + store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix)) + for i := uint32(0); i < count; i++ { + var event clobtypes.StagedFinalizeBlockEvent + bytes := store.Get(lib.Uint32ToKey(i)) + clobtypes.Amino.MustUnmarshal(bytes, &event) + events[i] = event + } + return events +} + +// Retrieve all events staged during `FinalizeBlock`. +func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents( + ctx sdk.Context, +) []clobtypes.StagedFinalizeBlockEvent { + noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) + store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey) + return getStagedFinalizeBlockEvents(store) +} + +func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent( + ctx sdk.Context, + eventBytes []byte, +) { + noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) + store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey) + + // Increment events count. + count := getStagedEventsCount(store) + store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1)) + + // Store events keyed by index. + store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix)) + store.Set(lib.Uint32ToKey(count), eventBytes) +} + // SendCombinedSnapshot sends messages to a particular subscriber without buffering. // Note this method requires the lock and assumes that the lock has already been // acquired by the caller. @@ -703,6 +796,80 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( return ret } +// Grpc Streaming logic after consensus agrees on a block. +// - Stream all events staged during `FinalizeBlock`. +// - Stream orderbook updates to sync fills in local ops queue. +func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, + orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, +) { + // Flush all pending updates, since we want the onchain updates to arrive in a batch. + sm.FlushStreamUpdates() + + finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) + + // TODO(CT-1190): Stream below in a single batch. + // Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock. + sm.SendOrderbookUpdates( + orderBookUpdatesToSyncLocalOpsQueue, + uint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) + + // Send finalized fills from FinalizeBlock. + sm.SendOrderbookFillUpdates( + finalizedFills, + uint32(ctx.BlockHeight()), + ctx.ExecMode(), + perpetualIdToClobPairId, + ) + + // Send finalized subaccount updates from FinalizeBlock. + sm.SendFinalizedSubaccountUpdates( + finalizedSubaccountUpdates, + uint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) +} + +// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`. +// It should be called after the consensus agrees on a block (e.g. Precommitter). +func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( + ctx sdk.Context, +) ( + finalizedFills []clobtypes.StreamOrderbookFill, + finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate, +) { + // Get onchain stream events stored in transient store. + stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx) + + metrics.SetGauge( + metrics.GrpcStagedAllFinalizeBlockUpdatesCount, + float32(len(stagedEvents)), + ) + + for _, stagedEvent := range stagedEvents { + switch event := stagedEvent.Event.(type) { + case *clobtypes.StagedFinalizeBlockEvent_OrderFill: + finalizedFills = append(finalizedFills, *event.OrderFill) + case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate: + finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate) + } + } + + metrics.SetGauge( + metrics.GrpcStagedSubaccountFinalizeBlockUpdatesCount, + float32(len(finalizedSubaccountUpdates)), + ) + metrics.SetGauge( + metrics.GrpcStagedFillFinalizeBlockUpdatesCount, + float32(len(finalizedFills)), + ) + + return finalizedFills, finalizedSubaccountUpdates +} + func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 6e9c00895d..4df60bc427 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -78,3 +78,28 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams( func (sm *NoopGrpcStreamingManager) Stop() { } + +func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill( + ctx sdk.Context, + fill clobtypes.StreamOrderbookFill, +) { +} + +func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents( + ctx sdk.Context, +) []clobtypes.StagedFinalizeBlockEvent { + return nil +} + +func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate( + ctx sdk.Context, + subaccountUpdate satypes.StreamSubaccountUpdate, +) { +} + +func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, + orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, +) { +} diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index e3dff9d94b..0f097d3e75 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -50,7 +50,23 @@ type FullNodeStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + StageFinalizeBlockFill( + ctx sdk.Context, + fill clobtypes.StreamOrderbookFill, + ) + StageFinalizeBlockSubaccountUpdate( + ctx sdk.Context, + subaccountUpdate satypes.StreamSubaccountUpdate, + ) + GetStagedFinalizeBlockEvents( + ctx sdk.Context, + ) []clobtypes.StagedFinalizeBlockEvent TracksSubaccountId(id satypes.SubaccountId) bool + StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, + orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, + ) } type OutgoingMessageSender interface { diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 96bacb2cc5..9fd5f4a0b5 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -46,6 +46,17 @@ func BeginBlocker( keeper.ResetAllDeliveredOrderIds(ctx) } +// Precommit executes all ABCI Precommit logic respective to the clob module. +func Precommit( + ctx sdk.Context, + keeper keeper.Keeper, +) { + if streamingManager := keeper.GetFullNodeStreamingManager(); !streamingManager.Enabled() { + return + } + keeper.StreamBatchUpdatesAfterFinalizeBlock(ctx) +} + // EndBlocker executes all ABCI EndBlock logic respective to the clob module. func EndBlocker( ctx sdk.Context, diff --git a/protocol/x/clob/keeper/grpc_stream_finalize_block.go b/protocol/x/clob/keeper/grpc_stream_finalize_block.go new file mode 100644 index 0000000000..1bc23e46c6 --- /dev/null +++ b/protocol/x/clob/keeper/grpc_stream_finalize_block.go @@ -0,0 +1,50 @@ +package keeper + +import ( + "time" + + "github.com/cosmos/cosmos-sdk/telemetry" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" + "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +// Returns the order updates needed to update the fill amount for the orders +// from local ops queue, according to the latest onchain state (after FinalizeBlock). +// Effectively reverts the optimistic fill amounts removed from the CheckTx to DeliverTx state transition. +func (k Keeper) getUpdatesToSyncLocalOpsQueue( + ctx sdk.Context, +) *types.OffchainUpdates { + localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx) + fetchOrdersInvolvedInOpQueue(localValidatorOperationsQueue) + orderIdsFromLocal := fetchOrdersInvolvedInOpQueue( + localValidatorOperationsQueue, + ) + allUpdates := types.NewOffchainUpdates() + for orderId := range orderIdsFromLocal { + orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + return allUpdates +} + +// Grpc Streaming logic after consensus agrees on a block. +// - Stream all events staged during `FinalizeBlock`. +// - Stream orderbook updates to sync fills in local ops queue. +func (k Keeper) StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, +) { + defer telemetry.MeasureSince( + time.Now(), + types.ModuleName, + metrics.StreamBatchUpdatesAfterFinalizeBlock, + metrics.Latency, + ) + orderBookUpdatesToSyncLocalOpsQueue := k.getUpdatesToSyncLocalOpsQueue(ctx) + + k.GetFullNodeStreamingManager().StreamBatchUpdatesAfterFinalizeBlock( + ctx, + orderBookUpdatesToSyncLocalOpsQueue, + k.PerpetualIdToClobPairId, + ) +} diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index d202c2a270..cddb5fb68b 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -57,26 +57,6 @@ func (k Keeper) ProcessProposerOperations( return errorsmod.Wrapf(types.ErrInvalidMsgProposedOperations, "Error: %+v", err) } - // If grpc streams are on, send absolute fill amounts from local + proposed opqueue to the grpc stream. - // This effetively reverts the optimitic orderbook updates during CheckTx. - if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() { - localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx) - orderIdsFromProposed := fetchOrdersInvolvedInOpQueue( - operations, - ) - orderIdsFromLocal := fetchOrdersInvolvedInOpQueue( - localValidatorOperationsQueue, - ) - orderIdSetToUpdate := lib.MergeMaps(orderIdsFromLocal, orderIdsFromProposed) - - allUpdates := types.NewOffchainUpdates() - for orderId := range orderIdSetToUpdate { - orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) - allUpdates.Append(orderbookUpdate) - } - k.SendOrderbookUpdates(ctx, allUpdates) - } - log.DebugLog(ctx, "Processing operations queue", log.OperationsQueue, types.GetInternalOperationsQueueTextString(operations)) @@ -550,6 +530,7 @@ func (k Keeper) PersistMatchOrdersToState( // if GRPC streaming is on, emit a generated clob match to stream. if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() { + // Note: GenerateStreamOrderbookFill doesn't rely on MemClob state. streamOrderbookFill := k.MemClob.GenerateStreamOrderbookFill( ctx, types.ClobMatch{ @@ -560,11 +541,10 @@ func (k Keeper) PersistMatchOrdersToState( &takerOrder, makerOrders, ) - k.SendOrderbookFillUpdates( + + k.GetFullNodeStreamingManager().StageFinalizeBlockFill( ctx, - []types.StreamOrderbookFill{ - streamOrderbookFill, - }, + streamOrderbookFill, ) } @@ -669,11 +649,9 @@ func (k Keeper) PersistMatchLiquidationToState( takerOrder, makerOrders, ) - k.SendOrderbookFillUpdates( + k.GetFullNodeStreamingManager().StageFinalizeBlockFill( ctx, - []types.StreamOrderbookFill{ - streamOrderbookFill, - }, + streamOrderbookFill, ) } return nil diff --git a/protocol/x/clob/types/streaming.pb.go b/protocol/x/clob/types/streaming.pb.go new file mode 100644 index 0000000000..83b8db719a --- /dev/null +++ b/protocol/x/clob/types/streaming.pb.go @@ -0,0 +1,473 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: dydxprotocol/clob/streaming.proto + +package types + +import ( + fmt "fmt" + proto "github.com/cosmos/gogoproto/proto" + types "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. +type StagedFinalizeBlockEvent struct { + // Contains one of StreamOrderbookFill, StreamSubaccountUpdate. + // + // Types that are valid to be assigned to Event: + // *StagedFinalizeBlockEvent_OrderFill + // *StagedFinalizeBlockEvent_SubaccountUpdate + Event isStagedFinalizeBlockEvent_Event `protobuf_oneof:"event"` +} + +func (m *StagedFinalizeBlockEvent) Reset() { *m = StagedFinalizeBlockEvent{} } +func (m *StagedFinalizeBlockEvent) String() string { return proto.CompactTextString(m) } +func (*StagedFinalizeBlockEvent) ProtoMessage() {} +func (*StagedFinalizeBlockEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_cecf6ffcf2554dee, []int{0} +} +func (m *StagedFinalizeBlockEvent) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StagedFinalizeBlockEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StagedFinalizeBlockEvent.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StagedFinalizeBlockEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_StagedFinalizeBlockEvent.Merge(m, src) +} +func (m *StagedFinalizeBlockEvent) XXX_Size() int { + return m.Size() +} +func (m *StagedFinalizeBlockEvent) XXX_DiscardUnknown() { + xxx_messageInfo_StagedFinalizeBlockEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_StagedFinalizeBlockEvent proto.InternalMessageInfo + +type isStagedFinalizeBlockEvent_Event interface { + isStagedFinalizeBlockEvent_Event() + MarshalTo([]byte) (int, error) + Size() int +} + +type StagedFinalizeBlockEvent_OrderFill struct { + OrderFill *StreamOrderbookFill `protobuf:"bytes,1,opt,name=order_fill,json=orderFill,proto3,oneof" json:"order_fill,omitempty"` +} +type StagedFinalizeBlockEvent_SubaccountUpdate struct { + SubaccountUpdate *types.StreamSubaccountUpdate `protobuf:"bytes,2,opt,name=subaccount_update,json=subaccountUpdate,proto3,oneof" json:"subaccount_update,omitempty"` +} + +func (*StagedFinalizeBlockEvent_OrderFill) isStagedFinalizeBlockEvent_Event() {} +func (*StagedFinalizeBlockEvent_SubaccountUpdate) isStagedFinalizeBlockEvent_Event() {} + +func (m *StagedFinalizeBlockEvent) GetEvent() isStagedFinalizeBlockEvent_Event { + if m != nil { + return m.Event + } + return nil +} + +func (m *StagedFinalizeBlockEvent) GetOrderFill() *StreamOrderbookFill { + if x, ok := m.GetEvent().(*StagedFinalizeBlockEvent_OrderFill); ok { + return x.OrderFill + } + return nil +} + +func (m *StagedFinalizeBlockEvent) GetSubaccountUpdate() *types.StreamSubaccountUpdate { + if x, ok := m.GetEvent().(*StagedFinalizeBlockEvent_SubaccountUpdate); ok { + return x.SubaccountUpdate + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*StagedFinalizeBlockEvent) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*StagedFinalizeBlockEvent_OrderFill)(nil), + (*StagedFinalizeBlockEvent_SubaccountUpdate)(nil), + } +} + +func init() { + proto.RegisterType((*StagedFinalizeBlockEvent)(nil), "dydxprotocol.clob.StagedFinalizeBlockEvent") +} + +func init() { proto.RegisterFile("dydxprotocol/clob/streaming.proto", fileDescriptor_cecf6ffcf2554dee) } + +var fileDescriptor_cecf6ffcf2554dee = []byte{ + // 281 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x4c, 0xa9, 0x4c, 0xa9, + 0x28, 0x28, 0xca, 0x2f, 0xc9, 0x4f, 0xce, 0xcf, 0xd1, 0x4f, 0xce, 0xc9, 0x4f, 0xd2, 0x2f, 0x2e, + 0x29, 0x4a, 0x4d, 0xcc, 0xcd, 0xcc, 0x4b, 0xd7, 0x03, 0x8b, 0x0b, 0x09, 0x22, 0x2b, 0xd1, 0x03, + 0x29, 0x91, 0xd2, 0x40, 0xd1, 0x55, 0x5c, 0x9a, 0x94, 0x98, 0x9c, 0x9c, 0x5f, 0x9a, 0x57, 0x52, + 0x8c, 0xae, 0x59, 0x4a, 0x16, 0xd3, 0xfc, 0xc2, 0xd2, 0xd4, 0xa2, 0x4a, 0x88, 0xb4, 0xd2, 0x59, + 0x46, 0x2e, 0x89, 0xe0, 0x92, 0xc4, 0xf4, 0xd4, 0x14, 0xb7, 0xcc, 0xbc, 0xc4, 0x9c, 0xcc, 0xaa, + 0x54, 0xa7, 0x9c, 0xfc, 0xe4, 0x6c, 0xd7, 0xb2, 0xd4, 0xbc, 0x12, 0x21, 0x77, 0x2e, 0xae, 0xfc, + 0xa2, 0x94, 0xd4, 0xa2, 0xf8, 0xb4, 0xcc, 0x9c, 0x1c, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x6e, 0x23, + 0x35, 0x3d, 0x0c, 0xd7, 0xe8, 0x05, 0x83, 0xed, 0xf4, 0x07, 0x29, 0x4d, 0xca, 0xcf, 0xcf, 0x76, + 0xcb, 0xcc, 0xc9, 0xf1, 0x60, 0x08, 0xe2, 0x04, 0xeb, 0x05, 0x71, 0x84, 0xe2, 0xb9, 0x04, 0x11, + 0x6e, 0x8c, 0x2f, 0x2d, 0x48, 0x49, 0x2c, 0x49, 0x95, 0x60, 0x02, 0x9b, 0x67, 0x80, 0x6a, 0x1e, + 0x92, 0x57, 0xa0, 0xc6, 0x06, 0xc3, 0x45, 0x42, 0xc1, 0xfa, 0x3c, 0x18, 0x82, 0x04, 0x8a, 0xd1, + 0xc4, 0x9c, 0xd8, 0xb9, 0x58, 0x53, 0x41, 0x4e, 0x76, 0x0a, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, + 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, + 0xc6, 0x63, 0x39, 0x86, 0x28, 0xb3, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, + 0x7d, 0x94, 0x30, 0x29, 0x33, 0xd1, 0x4d, 0xce, 0x48, 0xcc, 0xcc, 0xd3, 0x87, 0x8b, 0x54, 0x40, + 0xc2, 0xa9, 0xa4, 0xb2, 0x20, 0xb5, 0x38, 0x89, 0x0d, 0x2c, 0x6c, 0x0c, 0x08, 0x00, 0x00, 0xff, + 0xff, 0x65, 0x71, 0xd8, 0xa8, 0xa9, 0x01, 0x00, 0x00, +} + +func (m *StagedFinalizeBlockEvent) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StagedFinalizeBlockEvent) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Event != nil { + { + size := m.Event.Size() + i -= size + if _, err := m.Event.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *StagedFinalizeBlockEvent_OrderFill) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent_OrderFill) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.OrderFill != nil { + { + size, err := m.OrderFill.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStreaming(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} +func (m *StagedFinalizeBlockEvent_SubaccountUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent_SubaccountUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.SubaccountUpdate != nil { + { + size, err := m.SubaccountUpdate.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStreaming(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} +func encodeVarintStreaming(dAtA []byte, offset int, v uint64) int { + offset -= sovStreaming(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *StagedFinalizeBlockEvent) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Event != nil { + n += m.Event.Size() + } + return n +} + +func (m *StagedFinalizeBlockEvent_OrderFill) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.OrderFill != nil { + l = m.OrderFill.Size() + n += 1 + l + sovStreaming(uint64(l)) + } + return n +} +func (m *StagedFinalizeBlockEvent_SubaccountUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SubaccountUpdate != nil { + l = m.SubaccountUpdate.Size() + n += 1 + l + sovStreaming(uint64(l)) + } + return n +} + +func sovStreaming(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozStreaming(x uint64) (n int) { + return sovStreaming(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *StagedFinalizeBlockEvent) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStreaming + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StagedFinalizeBlockEvent: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StagedFinalizeBlockEvent: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field OrderFill", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStreaming + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStreaming + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStreaming + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &StreamOrderbookFill{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &StagedFinalizeBlockEvent_OrderFill{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SubaccountUpdate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStreaming + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStreaming + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStreaming + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &types.StreamSubaccountUpdate{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &StagedFinalizeBlockEvent_SubaccountUpdate{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStreaming(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthStreaming + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipStreaming(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStreaming + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStreaming + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStreaming + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthStreaming + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupStreaming + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthStreaming + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthStreaming = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowStreaming = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupStreaming = fmt.Errorf("proto: unexpected end of group") +) diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index a2fb54aacb..5eb650cce0 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -445,11 +445,9 @@ func (k Keeper) UpdateSubaccounts( if lib.IsDeliverTxMode(ctx) && k.GetFullNodeStreamingManager().Enabled() { if k.GetFullNodeStreamingManager().TracksSubaccountId(*u.SettledSubaccount.Id) { subaccountUpdate := GenerateStreamSubaccountUpdate(u, fundingPayments) - k.SendFinalizedSubaccountUpdates( + k.GetFullNodeStreamingManager().StageFinalizeBlockSubaccountUpdate( ctx, - []types.StreamSubaccountUpdate{ - subaccountUpdate, - }, + subaccountUpdate, ) } }