Skip to content

Commit

Permalink
Add sql script latency metrics (backport #2356) (#2371)
Browse files Browse the repository at this point in the history
Co-authored-by: dydxwill <[email protected]>
  • Loading branch information
mergify[bot] and dydxwill authored Sep 26, 2024
1 parent 9875783 commit d034716
Show file tree
Hide file tree
Showing 22 changed files with 197 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ describe('fundingHandler', () => {
}));
expect(stats.gauge).toHaveBeenCalledWith('ender.funding_index_update_event', 0.1, { ticker: 'BTC-USD' });
expect(stats.gauge).toHaveBeenCalledWith('ender.funding_index_update', 0.1, { ticker: 'BTC-USD' });
expect(stats.timing).toHaveBeenCalledWith(
'ender.handle_funding_event.sql_latency',
expect.any(Number),
{
className: 'FundingHandler',
eventType: 'FundingEvent',
},
);
});

it('successfully processes and clears cache for a new funding rate', async () => {
Expand Down
8 changes: 8 additions & 0 deletions indexer/services/ender/src/handlers/asset-handler.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
AssetFromDatabase,
AssetModel,
Expand All @@ -6,6 +7,7 @@ import {
import { AssetCreateEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';

Expand All @@ -18,6 +20,12 @@ export class AssetCreationHandler extends Handler<AssetCreateEventV1> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(resultRow: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_asset_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
const asset: AssetFromDatabase = AssetModel.fromJson(
resultRow.asset) as AssetFromDatabase;
assetRefresher.addAsset(asset);
Expand Down
7 changes: 7 additions & 0 deletions indexer/services/ender/src/handlers/funding-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ export class FundingHandler extends Handler<FundingEventMessage> {
});
stats.increment(`${config.SERVICE_NAME}.handle_funding_event.failure`, 1);
}

// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_funding_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
}

await Promise.all(promises);
Expand Down
8 changes: 8 additions & 0 deletions indexer/services/ender/src/handlers/liquidity-tier-handler.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
LiquidityTiersFromDatabase,
LiquidityTiersModel,
Expand All @@ -9,6 +10,7 @@ import { LiquidityTierUpsertEventV1, LiquidityTierUpsertEventV2 } from '@dydxpro
import _ from 'lodash';
import * as pg from 'pg';

import config from '../config';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';
Expand All @@ -21,6 +23,12 @@ export class LiquidityTierHandlerBase<T> extends Handler<T> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(resultRow: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_liquidity_tier_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
const liquidityTier: LiquidityTiersFromDatabase = LiquidityTiersModel.fromJson(
resultRow.liquidity_tier,
) as LiquidityTiersFromDatabase;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { logger } from '@dydxprotocol-indexer/base';
import { logger, stats } from '@dydxprotocol-indexer/base';
import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { Handler } from '../handler';

Expand All @@ -14,12 +15,18 @@ export class MarketCreateHandler extends Handler<MarketEventV1> {
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(_: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
public async internalHandle(resultRow: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
logger.info({
at: 'MarketCreateHandler#handle',
message: 'Received MarketEvent with MarketCreate.',
event: this.event,
});
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_market_create_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);

return [];
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { logger } from '@dydxprotocol-indexer/base';
import { logger, stats } from '@dydxprotocol-indexer/base';
import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { Handler } from '../handler';

Expand All @@ -14,13 +15,20 @@ export class MarketModifyHandler extends Handler<MarketEventV1> {
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(_: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
public async internalHandle(resultRow: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
logger.info({
at: 'MarketModifyHandler#handle',
message: 'Received MarketEvent with MarketModify.',
event: this.event,
});

// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_market_modify_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);

return [];
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger } from '@dydxprotocol-indexer/base';
import { logger, stats } from '@dydxprotocol-indexer/base';
import {
MarketFromDatabase,
OraclePriceFromDatabase,
Expand All @@ -8,6 +8,7 @@ import {
import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { generateOraclePriceContents } from '../../helpers/kafka-helper';
import {
ConsolidatedKafkaEvent,
Expand Down Expand Up @@ -35,6 +36,13 @@ export class MarketPriceUpdateHandler extends Handler<MarketEventV1> {
const oraclePrice: OraclePriceFromDatabase = OraclePriceModel.fromJson(
resultRow.oracle_price) as OraclePriceFromDatabase;

// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_market_price_update_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);

return [
this.generateKafkaEvent(
oraclePrice, market.pair,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger } from '@dydxprotocol-indexer/base';
import { logger, stats } from '@dydxprotocol-indexer/base';
import {
FillFromDatabase,
FillModel,
Expand All @@ -15,6 +15,7 @@ import {
import { DeleveragingEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants';
import { annotateWithPnl, convertPerpetualPosition } from '../../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../../lib/types';
Expand Down Expand Up @@ -95,6 +96,12 @@ export class DeleveragingHandler extends AbstractOrderFillHandler<DeleveragingEv
liquidatedFill,
),
];
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_deleveraging_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
return kafkaEvents;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
FillFromDatabase,
FillModel,
Expand All @@ -22,6 +23,7 @@ import { IndexerOrderId, LiquidationOrderV1 } from '@dydxprotocol-indexer/v4-pro
import Long from 'long';
import * as pg from 'pg';

import config from '../../config';
import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE, SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants';
import { annotateWithPnl, convertPerpetualPosition } from '../../helpers/kafka-helper';
import { redisClient } from '../../helpers/redis/redis-controller';
Expand Down Expand Up @@ -99,6 +101,12 @@ export class LiquidationHandler extends AbstractOrderFillHandler<OrderFillWithLi
perpetualMarketRefresher.getPerpetualMarketsMap(),
market,
);
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_liquidation_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);

if (this.event.liquidity === Liquidity.MAKER) {
// Must be done in this order, because fills refer to an order
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
FillFromDatabase,
FillModel,
Expand All @@ -24,6 +25,7 @@ import {
import Long from 'long';
import * as pg from 'pg';

import config from '../../config';
import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE, SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants';
import { annotateWithPnl, convertPerpetualPosition } from '../../helpers/kafka-helper';
import { sendOrderFilledNotification } from '../../helpers/notifications/notifications-functions';
Expand Down Expand Up @@ -133,6 +135,13 @@ export class OrderHandler extends AbstractOrderFillHandler<OrderFillWithLiquidit
return kafkaEvents;
}

// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_order_fill_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);

return kafkaEvents;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
PerpetualMarketFromDatabase, PerpetualMarketModel,
perpetualMarketRefresher,
} from '@dydxprotocol-indexer/postgres';
import { PerpetualMarketCreateEventV1, PerpetualMarketCreateEventV2 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';
Expand All @@ -24,6 +26,12 @@ export class PerpetualMarketCreationHandler extends Handler<
resultRow.perpetual_market) as PerpetualMarketFromDatabase;

perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket);
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_perpetual_market_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
return [
this.generateConsolidatedMarketKafkaEvent(
JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
OrderFromDatabase, OrderModel,
OrderTable,
Expand All @@ -10,6 +11,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { generateOrderSubaccountMessage } from '../../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';
Expand All @@ -33,6 +35,12 @@ export class ConditionalOrderPlacementHandler extends

const subaccountId:
IndexerSubaccountId = this.event.conditionalOrderPlacement!.order!.orderId!.subaccountId!;
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_conditional_order_placement_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
return this.createKafkaEvents(subaccountId, order, perpetualMarket);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
OrderFromDatabase,
OrderTable,
Expand All @@ -15,6 +16,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { sendOrderTriggeredNotification } from '../../helpers/notifications/notifications-functions';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';
Expand All @@ -40,7 +42,12 @@ export class ConditionalOrderTriggeredHandler extends

const indexerOrder: IndexerOrder = orderTranslations.convertToIndexerOrderWithSubaccount(
order, perpetualMarket, subaccount);

// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_conditional_order_triggered_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
await sendOrderTriggeredNotification(order, perpetualMarket, subaccount);
return this.createKafkaEvents(indexerOrder);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import { OrderTable } from '@dydxprotocol-indexer/postgres';
import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser';
import {
Expand All @@ -6,7 +7,9 @@ import {
OrderPlaceV1_OrderPlacementStatus,
StatefulOrderEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';

Expand All @@ -32,14 +35,20 @@ export class StatefulOrderPlacementHandler
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
public async internalHandle(resultRow: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
let order: IndexerOrder;
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
if (this.event.orderPlace !== undefined) {
order = this.event.orderPlace!.order!;
} else {
order = this.event.longTermOrderPlacement!.order!;
}
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_stateful_order_placement_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
return this.createKafkaEvents(order);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stats } from '@dydxprotocol-indexer/base';
import {
OrderTable,
} from '@dydxprotocol-indexer/postgres';
Expand All @@ -10,6 +11,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';

Expand All @@ -24,8 +26,14 @@ export class StatefulOrderRemovalHandler extends
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(_: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
public async internalHandle(resultRow: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
const orderIdProto: IndexerOrderId = this.event.orderRemoval!.removedOrderId!;
// Handle latency from resultRow
stats.timing(
`${config.SERVICE_NAME}.handle_stateful_order_removal_event.sql_latency`,
Number(resultRow.latency),
this.generateTimingStatsOptions(),
);
return this.createKafkaEvents(orderIdProto);
}

Expand Down
Loading

0 comments on commit d034716

Please sign in to comment.