Skip to content

Commit

Permalink
coments
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Sep 16, 2024
1 parent 976ba07 commit 5229884
Show file tree
Hide file tree
Showing 12 changed files with 889 additions and 798 deletions.
480 changes: 241 additions & 239 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts

Large diffs are not rendered by default.

67 changes: 0 additions & 67 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,18 +319,6 @@ export interface StreamUpdateSDKType {
taker_order?: StreamTakerOrderSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
}
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEvent {
orderFill?: StreamOrderbookFill;
subaccountUpdate?: StreamSubaccountUpdate;
}
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEventSDKType {
order_fill?: StreamOrderbookFillSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
}
/**
* StreamOrderbookUpdate provides information on an orderbook update. Used in
* the full node GRPC stream.
Expand Down Expand Up @@ -1408,61 +1396,6 @@ export const StreamUpdate = {

};

function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent {
return {
orderFill: undefined,
subaccountUpdate: undefined
};
}

export const StagedFinalizeBlockEvent = {
encode(message: StagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.orderFill !== undefined) {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(10).fork()).ldelim();
}

if (message.subaccountUpdate !== undefined) {
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim();
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StagedFinalizeBlockEvent {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStagedFinalizeBlockEvent();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
break;

case 2:
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StagedFinalizeBlockEvent>): StagedFinalizeBlockEvent {
const message = createBaseStagedFinalizeBlockEvent();
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined;
return message;
}

};

function createBaseStreamOrderbookUpdate(): StreamOrderbookUpdate {
return {
snapshot: false,
Expand Down
4 changes: 2 additions & 2 deletions indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import * as _122 from "./gogo";
export const gogoproto = { ..._122
import * as _123 from "./gogo";
export const gogoproto = { ..._123
};
22 changes: 11 additions & 11 deletions indexer/packages/v4-protos/src/codegen/google/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import * as _123 from "./api/annotations";
import * as _124 from "./api/http";
import * as _125 from "./protobuf/descriptor";
import * as _126 from "./protobuf/duration";
import * as _127 from "./protobuf/timestamp";
import * as _128 from "./protobuf/any";
import * as _124 from "./api/annotations";
import * as _125 from "./api/http";
import * as _126 from "./protobuf/descriptor";
import * as _127 from "./protobuf/duration";
import * as _128 from "./protobuf/timestamp";
import * as _129 from "./protobuf/any";
export namespace google {
export const api = { ..._123,
..._124
export const api = { ..._124,
..._125
};
export const protobuf = { ..._125,
..._126,
export const protobuf = { ..._126,
..._127,
..._128
..._128,
..._129
};
}
9 changes: 0 additions & 9 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,6 @@ message StreamUpdate {
}
}

// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`.
message StagedFinalizeBlockEvent {
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
}
}

// StreamOrderbookUpdate provides information on an orderbook update. Used in
// the full node GRPC stream.
message StreamOrderbookUpdate {
Expand Down
16 changes: 16 additions & 0 deletions proto/dydxprotocol/clob/streaming.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";
package dydxprotocol.clob;

import "dydxprotocol/subaccounts/streaming.proto";
import "dydxprotocol/clob/query.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`.
message StagedFinalizeBlockEvent {
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
}
}
32 changes: 16 additions & 16 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,22 @@ const (
GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency"

// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdates = "grpc_staged_all_finalize_block_updates"
GrpcStagedFillFinalizeBlockUpdates = "grpc_staged_finalize_block_fill_updates"
GrpcStagedSubaccountFinalizeBlockUpdates = "grpc_staged_finalize_block_subaccount_updates"
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
1 change: 1 addition & 0 deletions protocol/streaming/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package streaming

// Constants for FullNodeStreamingManager.
const (
// Transient store key for storing staged events.
StreamingManagerTransientStoreKey = "tmp_streaming"
Expand Down
23 changes: 10 additions & 13 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
Expand Down Expand Up @@ -404,6 +403,10 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
}

// Stage a fill event in transient store, during `FinalizeBlock`.
// Since `FinalizeBlock` code block can be called more than once with optimistc
// execution (once optimistcally and optionally once on the canonical block),
// we need to stage the events in transient store and later emit them
// during `Precommit`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
Expand Down Expand Up @@ -841,11 +844,9 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
// Get onchain stream events stored in transient store.
stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx)

telemetry.SetGauge(
metrics.SetGauge(
metrics.GrpcStagedAllFinalizeBlockUpdatesCount,
float32(len(stagedEvents)),
types.ModuleName,
metrics.GrpcStagedAllFinalizeBlockUpdates,
metrics.Count,
)

for _, stagedEvent := range stagedEvents {
Expand All @@ -857,17 +858,13 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
}
}

telemetry.SetGauge(
metrics.SetGauge(
metrics.GrpcStagedSubaccountFinalizeBlockUpdatesCount,
float32(len(finalizedSubaccountUpdates)),
types.ModuleName,
metrics.GrpcStagedSubaccountFinalizeBlockUpdates,
metrics.Count,
)
telemetry.SetGauge(
metrics.SetGauge(
metrics.GrpcStagedFillFinalizeBlockUpdatesCount,
float32(len(finalizedFills)),
types.ModuleName,
metrics.GrpcStagedFillFinalizeBlockUpdates,
metrics.Count,
)

return finalizedFills, finalizedSubaccountUpdates
Expand Down
1 change: 1 addition & 0 deletions protocol/x/clob/keeper/grpc_stream_finalize_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

// Returns the order updates needed to update the fill amount for the orders
// from local ops queue, according to the latest onchain state (after FinalizeBlock).
// Effectively reverts the optimistic fill amounts removed from the CheckTx to DeliverTx state transition.
func (k Keeper) getUpdatesToSyncLocalOpsQueue(
ctx sdk.Context,
) *types.OffchainUpdates {
Expand Down
Loading

0 comments on commit 5229884

Please sign in to comment.