From 4ee297453d7b25ea3308184b5b2bad0f23d06344 Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Mon, 30 Sep 2024 14:38:39 -0400 Subject: [PATCH] Internalize logic to stage FinalizeBlock events (#2399) (cherry picked from commit d583dbc1da9c9cad09c7160f7a1e2f683c62bb77) --- .../codegen/dydxprotocol/clob/streaming.ts | 16 +- proto/dydxprotocol/clob/streaming.proto | 1 + protocol/mocks/MemClobKeeper.go | 6 +- .../streaming/full_node_streaming_manager.go | 304 ++++++++++-------- protocol/streaming/noop_streaming_manager.go | 21 +- protocol/streaming/types/interface.go | 19 +- protocol/streaming/util/util.go | 8 +- protocol/testutil/memclob/keeper.go | 4 +- protocol/x/clob/keeper/keeper.go | 22 +- protocol/x/clob/keeper/process_operations.go | 16 +- protocol/x/clob/memclob/memclob.go | 2 +- protocol/x/clob/types/mem_clob_keeper.go | 4 +- protocol/x/clob/types/streaming.pb.go | 100 +++++- protocol/x/subaccounts/keeper/subaccount.go | 2 +- 14 files changed, 317 insertions(+), 208 deletions(-) diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts index dab8f1e122..1600c2e39c 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/streaming.ts @@ -1,4 +1,4 @@ -import { StreamOrderbookFill, StreamOrderbookFillSDKType } from "./query"; +import { StreamOrderbookFill, StreamOrderbookFillSDKType, StreamOrderbookUpdate, StreamOrderbookUpdateSDKType } from "./query"; import { StreamSubaccountUpdate, StreamSubaccountUpdateSDKType } from "../subaccounts/streaming"; import * as _m0 from "protobufjs/minimal"; import { DeepPartial } from "../../helpers"; @@ -7,18 +7,21 @@ import { DeepPartial } from "../../helpers"; export interface StagedFinalizeBlockEvent { orderFill?: StreamOrderbookFill; subaccountUpdate?: StreamSubaccountUpdate; + orderbookUpdate?: StreamOrderbookUpdate; } /** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */ export interface StagedFinalizeBlockEventSDKType { order_fill?: StreamOrderbookFillSDKType; subaccount_update?: StreamSubaccountUpdateSDKType; + orderbook_update?: StreamOrderbookUpdateSDKType; } function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent { return { orderFill: undefined, - subaccountUpdate: undefined + subaccountUpdate: undefined, + orderbookUpdate: undefined }; } @@ -32,6 +35,10 @@ export const StagedFinalizeBlockEvent = { StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim(); } + if (message.orderbookUpdate !== undefined) { + StreamOrderbookUpdate.encode(message.orderbookUpdate, writer.uint32(26).fork()).ldelim(); + } + return writer; }, @@ -52,6 +59,10 @@ export const StagedFinalizeBlockEvent = { message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32()); break; + case 3: + message.orderbookUpdate = StreamOrderbookUpdate.decode(reader, reader.uint32()); + break; + default: reader.skipType(tag & 7); break; @@ -65,6 +76,7 @@ export const StagedFinalizeBlockEvent = { const message = createBaseStagedFinalizeBlockEvent(); message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined; message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined; + message.orderbookUpdate = object.orderbookUpdate !== undefined && object.orderbookUpdate !== null ? StreamOrderbookUpdate.fromPartial(object.orderbookUpdate) : undefined; return message; } diff --git a/proto/dydxprotocol/clob/streaming.proto b/proto/dydxprotocol/clob/streaming.proto index 06c74ffbe1..ae3811134e 100644 --- a/proto/dydxprotocol/clob/streaming.proto +++ b/proto/dydxprotocol/clob/streaming.proto @@ -12,5 +12,6 @@ message StagedFinalizeBlockEvent { oneof event { StreamOrderbookFill order_fill = 1; dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2; + StreamOrderbookUpdate orderbook_update = 3; } } diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 12a2f8cff3..5d8d68d42f 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -415,9 +415,9 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } -// SendOrderbookFillUpdates provides a mock function with given fields: ctx, orderbookFills -func (_m *MemClobKeeper) SendOrderbookFillUpdates(ctx types.Context, orderbookFills []clobtypes.StreamOrderbookFill) { - _m.Called(ctx, orderbookFills) +// SendOrderbookFillUpdate provides a mock function with given fields: ctx, orderbookFills +func (_m *MemClobKeeper) SendOrderbookFillUpdate(ctx types.Context, orderbookFill clobtypes.StreamOrderbookFill) { + _m.Called(ctx, orderbookFill) } // SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index c43a254e78..85c265f12e 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -20,6 +20,8 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/streaming/types" streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" ) var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) @@ -313,10 +315,7 @@ func toOrderbookStreamUpdate( blockHeight uint32, execMode sdk.ExecMode, ) []clobtypes.StreamUpdate { - v1updates, err := streaming_util.GetOffchainUpdatesV1(offchainUpdates) - if err != nil { - panic(err) - } + v1updates := streaming_util.GetOffchainUpdatesV1(offchainUpdates) return []clobtypes.StreamUpdate{ { UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ @@ -390,39 +389,22 @@ func getStagedEventsCount(store storetypes.KVStore) uint32 { return binary.BigEndian.Uint32(countsBytes) } -// Stage a subaccount update event in transient store, during `FinalizeBlock`. -func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate( +// Send a subaccount update event. +func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) { - lib.AssertDeliverTxMode(ctx) - stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ - SubaccountUpdate: &subaccountUpdate, - }, + // If not `DeliverTx`, return since we don't stream optimistic subaccount updates. + if !lib.IsDeliverTxMode(ctx) { + return } - sm.stageFinalizeBlockEvent( - ctx, - sm.cdc.MustMarshal(&stagedEvent), - ) -} -// Stage a fill event in transient store, during `FinalizeBlock`. -// Since `FinalizeBlock` code block can be called more than once with optimistic -// execution (once optimistically and optionally once on the canonical block), -// we need to stage the events in transient store and later emit them -// during `Precommit`. -func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, -) { - lib.AssertDeliverTxMode(ctx) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ - Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ - OrderFill: &fill, + Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ + SubaccountUpdate: &subaccountUpdate, }, } - sm.stageFinalizeBlockEvent( ctx, sm.cdc.MustMarshal(&stagedEvent), @@ -501,29 +483,49 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes. } func getStreamUpdatesFromOffchainUpdates( - offchainUpdates *clobtypes.OffchainUpdates, + v1updates []ocutypes.OffChainUpdateV1, blockHeight uint32, execMode sdk.ExecMode, ) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { // Group updates by clob pair id. - updates := make(map[uint32]*clobtypes.OffchainUpdates) - for _, message := range offchainUpdates.Messages { - clobPairId := message.OrderId.ClobPairId - if _, ok := updates[clobPairId]; !ok { - updates[clobPairId] = clobtypes.NewOffchainUpdates() + clobPairIdToV1Updates := make(map[uint32][]ocutypes.OffChainUpdateV1) + // unique list of clob pair Ids to send updates for. + clobPairIds = make([]uint32, 0) + for _, v1update := range v1updates { + var clobPairId uint32 + switch u := v1update.UpdateMessage.(type) { + case *ocutypes.OffChainUpdateV1_OrderPlace: + clobPairId = u.OrderPlace.Order.OrderId.ClobPairId + case *ocutypes.OffChainUpdateV1_OrderReplace: + clobPairId = u.OrderReplace.OldOrderId.ClobPairId + case *ocutypes.OffChainUpdateV1_OrderRemove: + clobPairId = u.OrderRemove.RemovedOrderId.ClobPairId + case *ocutypes.OffChainUpdateV1_OrderUpdate: + clobPairId = u.OrderUpdate.OrderId.ClobPairId + default: + panic(fmt.Sprintf("Unhandled UpdateMessage type: %v", u)) } - updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) + + if _, ok := clobPairIdToV1Updates[clobPairId]; !ok { + clobPairIdToV1Updates[clobPairId] = []ocutypes.OffChainUpdateV1{} + clobPairIds = append(clobPairIds, clobPairId) + } + clobPairIdToV1Updates[clobPairId] = append(clobPairIdToV1Updates[clobPairId], v1update) } // Unmarshal each per-clob pair message to v1 updates. - streamUpdates = make([]clobtypes.StreamUpdate, 0) - clobPairIds = make([]uint32, 0) - for clobPairId, update := range updates { - v1updates, err := streaming_util.GetOffchainUpdatesV1(update) - if err != nil { - panic(err) + streamUpdates = make([]clobtypes.StreamUpdate, len(clobPairIds)) + + for i, clobPairId := range clobPairIds { + v1updates, exists := clobPairIdToV1Updates[clobPairId] + if !exists { + panic(fmt.Sprintf( + "clob pair id %v not found in clobPairIdToV1Updates: %v", + clobPairId, + clobPairIdToV1Updates, + )) } - streamUpdate := clobtypes.StreamUpdate{ + streamUpdates[i] = clobtypes.StreamUpdate{ UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ Updates: v1updates, @@ -533,8 +535,6 @@ func getStreamUpdatesFromOffchainUpdates( BlockHeight: blockHeight, ExecMode: uint32(execMode), } - streamUpdates = append(streamUpdates, streamUpdate) - clobPairIds = append(clobPairIds, clobPairId) } return streamUpdates, clobPairIds @@ -544,18 +544,40 @@ func getStreamUpdatesFromOffchainUpdates( // sends messages to the subscribers. func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, - blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookUpdatesLatency, - time.Now(), - ) + v1updates := streaming_util.GetOffchainUpdatesV1(offchainUpdates) + + // If not `DeliverTx`, then updates are optimistic. Stream them directly. + if !lib.IsDeliverTxMode(ctx) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookUpdatesLatency, + time.Now(), + ) - streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode) + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( + v1updates, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + return + } - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: false, + }, + }, + } + sm.stageFinalizeBlockEvent( + ctx, + sm.cdc.MustMarshal(&stagedEvent), + ) } func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( @@ -595,36 +617,52 @@ func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( return streamUpdates, clobPairIds } -// SendOrderbookFillUpdates groups fills by their clob pair ids and +// SendOrderbookFillUpdate groups fills by their clob pair ids and // sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, - blockHeight uint32, - execMode sdk.ExecMode, +func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookFillsLatency, - time.Now(), - ) + // If not `DeliverTx`, then updates are optimistic. Stream them directly. + if !lib.IsDeliverTxMode(ctx) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookFillsLatency, + time.Now(), + ) - streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( - orderbookFills, - blockHeight, - execMode, - perpetualIdToClobPairId, - ) + streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( + []clobtypes.StreamOrderbookFill{orderbookFill}, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + perpetualIdToClobPairId, + ) + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + return + } + + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ + OrderFill: &orderbookFill, + }, + } - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + sm.stageFinalizeBlockEvent( + ctx, + sm.cdc.MustMarshal(&stagedEvent), + ) } // SendTakerOrderStatus sends out a taker order and its status to the full node streaming service. func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( streamTakerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { + // In current design, we never send this during `DeliverTx` (`FinalizeBlock`). + lib.AssertCheckTxMode(ctx) + clobPairId := uint32(0) if liqOrder := streamTakerOrder.GetLiquidationOrder(); liqOrder != nil { clobPairId = liqOrder.ClobPairId @@ -639,8 +677,8 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ TakerOrder: &streamTakerOrder, }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), + BlockHeight: lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ExecMode: uint32(ctx.ExecMode()), }, }, []uint32{clobPairId}, @@ -712,13 +750,7 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( float32(len(updates)), ) - sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) - for _, clobPairId := range clobPairIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.clobPairIdToSubscriptionIdMapping[clobPairId], - ) - } + sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds) // Remove all subscriptions and wipe the buffer if buffer overflows. sm.RemoveSubscriptionsAndClearBufferIfFull() @@ -739,13 +771,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( float32(len(updates)), ) - sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) - for _, subaccountId := range subaccountIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.subaccountIdToSubscriptionIdMapping[*subaccountId], - ) - } + sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds) + sm.RemoveSubscriptionsAndClearBufferIfFull() sm.EmitMetrics() } @@ -850,38 +877,31 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( return ret } -// addBatchUpdatesToCacheWithLock adds batched updates to the cache. -// Used by `StreamBatchUpdatesAfterFinalizeBlock` to batch orderbook, fill -// and subaccount updates in a single stream. -// Note this method requires the lock and assumes that the lock has already been +// cacheStreamUpdatesByClobPairWithLock adds stream updates to cache, +// and store corresponding clob pair Ids. +// This method requires the lock and assumes that the lock has already been // acquired by the caller. -func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCacheWithLock( - orderbookStreamUpdates []clobtypes.StreamUpdate, - orderbookClobPairIds []uint32, - fillStreamUpdates []clobtypes.StreamUpdate, - fillClobPairIds []uint32, - subaccountStreamUpdates []clobtypes.StreamUpdate, - subaccountIds []*satypes.SubaccountId, +func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByClobPairWithLock( + streamUpdates []clobtypes.StreamUpdate, + clobPairIds []uint32, ) { - // Add orderbook updates to cache. - sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...) - for _, clobPairId := range orderbookClobPairIds { - sm.streamUpdateSubscriptionCache = append( - sm.streamUpdateSubscriptionCache, - sm.clobPairIdToSubscriptionIdMapping[clobPairId], - ) - } - - // Add fill updates to cache. - sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...) - for _, clobPairId := range fillClobPairIds { + sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...) + for _, clobPairId := range clobPairIds { sm.streamUpdateSubscriptionCache = append( sm.streamUpdateSubscriptionCache, sm.clobPairIdToSubscriptionIdMapping[clobPairId], ) } +} - // Add subaccount updates to cache. +// cacheStreamUpdatesBySubaccountWithLock adds subaccount stream updates to cache, +// and store corresponding subaccount Ids. +// This method requires the lock and assumes that the lock has already been +// acquired by the caller. +func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesBySubaccountWithLock( + subaccountStreamUpdates []clobtypes.StreamUpdate, + subaccountIds []*satypes.SubaccountId, +) { sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...) for _, subaccountId := range subaccountIds { sm.streamUpdateSubscriptionCache = append( @@ -902,41 +922,50 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( // Prevent gas metering from state read. ctx = ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) - finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) + finalizedFills, + finalizedSubaccountUpdates, + finalizedOrderbookUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) + + sm.Lock() + defer sm.Unlock() + + // Flush all pending updates, since we want the onchain updates to arrive in a batch. + sm.FlushStreamUpdatesWithLock() - orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates( - orderBookUpdatesToSyncLocalOpsQueue, - uint32(ctx.BlockHeight()), + // Cache updates to sync local ops queue + sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( + streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) + sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) + // Cache updates for finalized fills. fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( finalizedFills, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) + sm.cacheStreamUpdatesByClobPairWithLock(fillStreamUpdates, fillClobPairIds) + + // Cache updates for finalized orderbook updates (e.g. RemoveOrderFillAmount in `EndBlocker`). + for _, finalizedUpdate := range finalizedOrderbookUpdates { + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates( + finalizedUpdate.Updates, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) + sm.cacheStreamUpdatesByClobPairWithLock(streamUpdates, clobPairIds) + } + // Finally, cache updates for finalized subaccount updates subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( finalizedSubaccountUpdates, - uint32(ctx.BlockHeight()), + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) - - sm.Lock() - defer sm.Unlock() - - // Flush all pending updates, since we want the onchain updates to arrive in a batch. - sm.FlushStreamUpdatesWithLock() - - sm.addBatchUpdatesToCacheWithLock( - orderbookStreamUpdates, - orderbookClobPairIds, - fillStreamUpdates, - fillClobPairIds, - subaccountStreamUpdates, - subaccountIds, - ) + sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds) // Emit all stream updates in a single batch. // Note we still have the lock, which is released right before function returns. @@ -950,6 +979,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( ) ( finalizedFills []clobtypes.StreamOrderbookFill, finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate, + finalizedOrderbookUpdates []clobtypes.StreamOrderbookUpdate, ) { // Get onchain stream events stored in transient store. stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx) @@ -965,6 +995,10 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( finalizedFills = append(finalizedFills, *event.OrderFill) case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate: finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate) + case *clobtypes.StagedFinalizeBlockEvent_OrderbookUpdate: + finalizedOrderbookUpdates = append(finalizedOrderbookUpdates, *event.OrderbookUpdate) + default: + panic(fmt.Sprintf("Unhandled staged event type: %v\n", stagedEvent.Event)) } } @@ -977,7 +1011,7 @@ func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( float32(len(finalizedFills)), ) - return finalizedFills, finalizedSubaccountUpdates + return finalizedFills, finalizedSubaccountUpdates, finalizedOrderbookUpdates } func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 4df60bc427..9dc7bf6de9 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -31,23 +31,20 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, - blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { } -func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, - blockHeight uint32, - execMode sdk.ExecMode, +func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { } func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) { } @@ -79,19 +76,13 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams( func (sm *NoopGrpcStreamingManager) Stop() { } -func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, -) { -} - func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents( ctx sdk.Context, ) []clobtypes.StagedFinalizeBlockEvent { return nil } -func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate( +func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) { diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 0f097d3e75..5b42864016 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -31,30 +31,23 @@ type FullNodeStreamingManager interface { ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, - blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) - SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, - blockHeight uint32, - execMode sdk.ExecMode, + SendOrderbookFillUpdate( + orderbookFill clobtypes.StreamOrderbookFill, + ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, - blockHeight uint32, - execMode sdk.ExecMode, + ctx sdk.Context, ) SendFinalizedSubaccountUpdates( subaccountUpdates []satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) - StageFinalizeBlockFill( - ctx sdk.Context, - fill clobtypes.StreamOrderbookFill, - ) - StageFinalizeBlockSubaccountUpdate( + SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, ) diff --git a/protocol/streaming/util/util.go b/protocol/streaming/util/util.go index 985a29ef33..bbf37e3340 100644 --- a/protocol/streaming/util/util.go +++ b/protocol/streaming/util/util.go @@ -1,21 +1,23 @@ package util import ( + "fmt" + "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) // GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1. -func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) ([]ocutypes.OffChainUpdateV1, error) { +func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes.OffChainUpdateV1 { v1updates := make([]ocutypes.OffChainUpdateV1, 0) for _, message := range offchainUpdates.Messages { var update ocutypes.OffChainUpdateV1 err := proto.Unmarshal(message.Message.Value, &update) if err != nil { - return nil, err + panic(fmt.Sprintf("Failed to get OffchainUpdatesV1: %v", err)) } v1updates = append(v1updates, update) } - return v1updates, nil + return v1updates } diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 22ba76acdd..376f6fb30a 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -508,9 +508,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates( ) { } -func (f *FakeMemClobKeeper) SendOrderbookFillUpdates( +func (f *FakeMemClobKeeper) SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []types.StreamOrderbookFill, + orderbookFill types.StreamOrderbookFill, ) { } diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 8c39342ca6..f49eb61271 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -309,23 +309,18 @@ func (k Keeper) SendOrderbookUpdates( k.GetFullNodeStreamingManager().SendOrderbookUpdates( offchainUpdates, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + ctx, ) } -// SendOrderbookFillUpdates sends the orderbook fills to the Full Node streaming manager. -func (k Keeper) SendOrderbookFillUpdates( +// SendOrderbookFillUpdate sends the orderbook fills to the Full Node streaming manager. +func (k Keeper) SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []types.StreamOrderbookFill, + orderbookFill types.StreamOrderbookFill, ) { - if len(orderbookFills) == 0 { - return - } - k.GetFullNodeStreamingManager().SendOrderbookFillUpdates( - orderbookFills, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( + orderbookFill, + ctx, k.PerpetualIdToClobPairId, ) } @@ -337,7 +332,6 @@ func (k Keeper) SendTakerOrderStatus( ) { k.GetFullNodeStreamingManager().SendTakerOrderStatus( takerOrder, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), + ctx, ) } diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index 026d9d316c..86808afb5d 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -560,9 +560,10 @@ func (k Keeper) PersistMatchOrdersToState( makerOrders, ) - k.GetFullNodeStreamingManager().StageFinalizeBlockFill( - ctx, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, + ctx, + k.PerpetualIdToClobPairId, ) } @@ -670,9 +671,10 @@ func (k Keeper) PersistMatchLiquidationToState( takerOrder, makerOrders, ) - k.GetFullNodeStreamingManager().StageFinalizeBlockFill( - ctx, + k.GetFullNodeStreamingManager().SendOrderbookFillUpdate( streamOrderbookFill, + ctx, + k.PerpetualIdToClobPairId, ) } return nil @@ -843,11 +845,9 @@ func (k Keeper) PersistMatchDeleveragingToState( }, }, } - k.SendOrderbookFillUpdates( + k.SendOrderbookFillUpdate( ctx, - []types.StreamOrderbookFill{ - streamOrderbookFill, - }, + streamOrderbookFill, ) } } diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index d6d1e08774..db541650c3 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -402,7 +402,7 @@ func (m *MemClobPriceTimePriority) mustUpdateMemclobStateWithMatches( ) clobMatch := internalOperation.GetMatch() orderbookMatchFill := m.GenerateStreamOrderbookFill(ctx, *clobMatch, takerOrder, makerOrders) - m.clobKeeper.SendOrderbookFillUpdates(ctx, []types.StreamOrderbookFill{orderbookMatchFill}) + m.clobKeeper.SendOrderbookFillUpdate(ctx, orderbookMatchFill) } // Build a slice of all subaccounts which had matches this matching loop, and sort them for determinism. diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index d555367718..6e25cadf35 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -102,9 +102,9 @@ type MemClobKeeper interface { ctx sdk.Context, offchainUpdates *OffchainUpdates, ) - SendOrderbookFillUpdates( + SendOrderbookFillUpdate( ctx sdk.Context, - orderbookFills []StreamOrderbookFill, + orderbookFill StreamOrderbookFill, ) SendTakerOrderStatus( ctx sdk.Context, diff --git a/protocol/x/clob/types/streaming.pb.go b/protocol/x/clob/types/streaming.pb.go index 83b8db719a..1f6f552fb3 100644 --- a/protocol/x/clob/types/streaming.pb.go +++ b/protocol/x/clob/types/streaming.pb.go @@ -30,6 +30,7 @@ type StagedFinalizeBlockEvent struct { // Types that are valid to be assigned to Event: // *StagedFinalizeBlockEvent_OrderFill // *StagedFinalizeBlockEvent_SubaccountUpdate + // *StagedFinalizeBlockEvent_OrderbookUpdate Event isStagedFinalizeBlockEvent_Event `protobuf_oneof:"event"` } @@ -78,9 +79,13 @@ type StagedFinalizeBlockEvent_OrderFill struct { type StagedFinalizeBlockEvent_SubaccountUpdate struct { SubaccountUpdate *types.StreamSubaccountUpdate `protobuf:"bytes,2,opt,name=subaccount_update,json=subaccountUpdate,proto3,oneof" json:"subaccount_update,omitempty"` } +type StagedFinalizeBlockEvent_OrderbookUpdate struct { + OrderbookUpdate *StreamOrderbookUpdate `protobuf:"bytes,3,opt,name=orderbook_update,json=orderbookUpdate,proto3,oneof" json:"orderbook_update,omitempty"` +} func (*StagedFinalizeBlockEvent_OrderFill) isStagedFinalizeBlockEvent_Event() {} func (*StagedFinalizeBlockEvent_SubaccountUpdate) isStagedFinalizeBlockEvent_Event() {} +func (*StagedFinalizeBlockEvent_OrderbookUpdate) isStagedFinalizeBlockEvent_Event() {} func (m *StagedFinalizeBlockEvent) GetEvent() isStagedFinalizeBlockEvent_Event { if m != nil { @@ -103,11 +108,19 @@ func (m *StagedFinalizeBlockEvent) GetSubaccountUpdate() *types.StreamSubaccount return nil } +func (m *StagedFinalizeBlockEvent) GetOrderbookUpdate() *StreamOrderbookUpdate { + if x, ok := m.GetEvent().(*StagedFinalizeBlockEvent_OrderbookUpdate); ok { + return x.OrderbookUpdate + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*StagedFinalizeBlockEvent) XXX_OneofWrappers() []interface{} { return []interface{}{ (*StagedFinalizeBlockEvent_OrderFill)(nil), (*StagedFinalizeBlockEvent_SubaccountUpdate)(nil), + (*StagedFinalizeBlockEvent_OrderbookUpdate)(nil), } } @@ -118,25 +131,26 @@ func init() { func init() { proto.RegisterFile("dydxprotocol/clob/streaming.proto", fileDescriptor_cecf6ffcf2554dee) } var fileDescriptor_cecf6ffcf2554dee = []byte{ - // 281 bytes of a gzipped FileDescriptorProto + // 303 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x4c, 0xa9, 0x4c, 0xa9, 0x28, 0x28, 0xca, 0x2f, 0xc9, 0x4f, 0xce, 0xcf, 0xd1, 0x4f, 0xce, 0xc9, 0x4f, 0xd2, 0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0xcd, 0xcc, 0x4b, 0xd7, 0x03, 0x8b, 0x0b, 0x09, 0x22, 0x2b, 0xd1, 0x03, 0x29, 0x91, 0xd2, 0x40, 0xd1, 0x55, 0x5c, 0x9a, 0x94, 0x98, 0x9c, 0x9c, 0x5f, 0x9a, 0x57, 0x52, - 0x8c, 0xae, 0x59, 0x4a, 0x16, 0xd3, 0xfc, 0xc2, 0xd2, 0xd4, 0xa2, 0x4a, 0x88, 0xb4, 0xd2, 0x59, - 0x46, 0x2e, 0x89, 0xe0, 0x92, 0xc4, 0xf4, 0xd4, 0x14, 0xb7, 0xcc, 0xbc, 0xc4, 0x9c, 0xcc, 0xaa, + 0x8c, 0xae, 0x59, 0x4a, 0x16, 0xd3, 0xfc, 0xc2, 0xd2, 0xd4, 0xa2, 0x4a, 0x88, 0xb4, 0xd2, 0x12, + 0x26, 0x2e, 0x89, 0xe0, 0x92, 0xc4, 0xf4, 0xd4, 0x14, 0xb7, 0xcc, 0xbc, 0xc4, 0x9c, 0xcc, 0xaa, 0x54, 0xa7, 0x9c, 0xfc, 0xe4, 0x6c, 0xd7, 0xb2, 0xd4, 0xbc, 0x12, 0x21, 0x77, 0x2e, 0xae, 0xfc, 0xa2, 0x94, 0xd4, 0xa2, 0xf8, 0xb4, 0xcc, 0x9c, 0x1c, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x6e, 0x23, 0x35, 0x3d, 0x0c, 0xd7, 0xe8, 0x05, 0x83, 0xed, 0xf4, 0x07, 0x29, 0x4d, 0xca, 0xcf, 0xcf, 0x76, 0xcb, 0xcc, 0xc9, 0xf1, 0x60, 0x08, 0xe2, 0x04, 0xeb, 0x05, 0x71, 0x84, 0xe2, 0xb9, 0x04, 0x11, 0x6e, 0x8c, 0x2f, 0x2d, 0x48, 0x49, 0x2c, 0x49, 0x95, 0x60, 0x02, 0x9b, 0x67, 0x80, 0x6a, 0x1e, 0x92, 0x57, 0xa0, 0xc6, 0x06, 0xc3, 0x45, 0x42, 0xc1, 0xfa, 0x3c, 0x18, 0x82, 0x04, 0x8a, 0xd1, - 0xc4, 0x9c, 0xd8, 0xb9, 0x58, 0x53, 0x41, 0x4e, 0x76, 0x0a, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, - 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, - 0xc6, 0x63, 0x39, 0x86, 0x28, 0xb3, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, - 0x7d, 0x94, 0x30, 0x29, 0x33, 0xd1, 0x4d, 0xce, 0x48, 0xcc, 0xcc, 0xd3, 0x87, 0x8b, 0x54, 0x40, - 0xc2, 0xa9, 0xa4, 0xb2, 0x20, 0xb5, 0x38, 0x89, 0x0d, 0x2c, 0x6c, 0x0c, 0x08, 0x00, 0x00, 0xff, - 0xff, 0x65, 0x71, 0xd8, 0xa8, 0xa9, 0x01, 0x00, 0x00, + 0xc4, 0x84, 0x42, 0xb9, 0x04, 0xf2, 0x61, 0xd6, 0xc3, 0xcc, 0x67, 0x06, 0x9b, 0xaf, 0x41, 0xd8, + 0xbd, 0x70, 0x73, 0xf9, 0xf3, 0x51, 0x85, 0x9c, 0xd8, 0xb9, 0x58, 0x53, 0x41, 0x21, 0xe1, 0x14, + 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, + 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x51, 0x66, 0xe9, 0x99, 0x25, 0x19, + 0xa5, 0x49, 0x7a, 0xc9, 0xf9, 0xb9, 0xfa, 0x28, 0x41, 0x5d, 0x66, 0xa2, 0x9b, 0x9c, 0x91, 0x98, + 0x99, 0xa7, 0x0f, 0x17, 0xa9, 0x80, 0x04, 0x7f, 0x49, 0x65, 0x41, 0x6a, 0x71, 0x12, 0x1b, 0x58, + 0xd8, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xc0, 0xbc, 0x6c, 0x00, 0x02, 0x00, 0x00, } func (m *StagedFinalizeBlockEvent) Marshal() (dAtA []byte, err error) { @@ -213,6 +227,27 @@ func (m *StagedFinalizeBlockEvent_SubaccountUpdate) MarshalToSizedBuffer(dAtA [] } return len(dAtA) - i, nil } +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.OrderbookUpdate != nil { + { + size, err := m.OrderbookUpdate.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStreaming(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + return len(dAtA) - i, nil +} func encodeVarintStreaming(dAtA []byte, offset int, v uint64) int { offset -= sovStreaming(v) base := offset @@ -260,6 +295,18 @@ func (m *StagedFinalizeBlockEvent_SubaccountUpdate) Size() (n int) { } return n } +func (m *StagedFinalizeBlockEvent_OrderbookUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.OrderbookUpdate != nil { + l = m.OrderbookUpdate.Size() + n += 1 + l + sovStreaming(uint64(l)) + } + return n +} func sovStreaming(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 @@ -366,6 +413,41 @@ func (m *StagedFinalizeBlockEvent) Unmarshal(dAtA []byte) error { } m.Event = &StagedFinalizeBlockEvent_SubaccountUpdate{v} iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field OrderbookUpdate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStreaming + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStreaming + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStreaming + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &StreamOrderbookUpdate{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &StagedFinalizeBlockEvent_OrderbookUpdate{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStreaming(dAtA[iNdEx:]) diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index 5eb650cce0..e72ccd60e7 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -445,7 +445,7 @@ func (k Keeper) UpdateSubaccounts( if lib.IsDeliverTxMode(ctx) && k.GetFullNodeStreamingManager().Enabled() { if k.GetFullNodeStreamingManager().TracksSubaccountId(*u.SettledSubaccount.Id) { subaccountUpdate := GenerateStreamSubaccountUpdate(u, fundingPayments) - k.GetFullNodeStreamingManager().StageFinalizeBlockSubaccountUpdate( + k.GetFullNodeStreamingManager().SendSubaccountUpdate( ctx, subaccountUpdate, )