Skip to content

Commit

Permalink
Add instance to vulcan metrics (#2265)
Browse files Browse the repository at this point in the history
  • Loading branch information
roy-dydx authored Sep 16, 2024
1 parent 2882271 commit 46a1c88
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 17 deletions.
10 changes: 8 additions & 2 deletions indexer/services/vulcan/src/handlers/order-place-handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base';
import {
logger, getInstanceId, runFuncWithTimingStat, stats,
} from '@dydxprotocol-indexer/base';
import { createSubaccountWebsocketMessage, KafkaTopics } from '@dydxprotocol-indexer/kafka';
import {
blockHeightRefresher,
Expand Down Expand Up @@ -89,7 +91,11 @@ export class OrderPlaceHandler extends Handler {
});

if (placeOrderResult.replaced) {
stats.increment(`${config.SERVICE_NAME}.place_order_handler.replaced_order`, 1);
stats.increment(
`${config.SERVICE_NAME}.place_order_handler.replaced_order`,
1,
{ instance: getInstanceId() },
);
}

// TODO(CLOB-597): Remove this logic and log erorrs once best-effort-open is not sent for
Expand Down
28 changes: 23 additions & 5 deletions indexer/services/vulcan/src/handlers/order-remove-handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base';
import {
logger, getInstanceId, runFuncWithTimingStat, stats,
} from '@dydxprotocol-indexer/base';
import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, getTriggerPrice } from '@dydxprotocol-indexer/kafka';
import {
blockHeightRefresher,
Expand Down Expand Up @@ -90,7 +92,11 @@ export class OrderRemoveHandler extends Handler {
reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED &&
!(await this.isOrderExpired(orderRemove))
) {
stats.increment(`${config.SERVICE_NAME}.order_remove_reason_indexer_temp_expired`, 1);
stats.increment(
`${config.SERVICE_NAME}.order_remove_reason_indexer_temp_expired`,
1,
{ instance: getInstanceId() },
);
logger.info({
at: 'OrderRemoveHandler#handle',
message: 'Order was expired by Indexer but is no longer expired. Ignoring.',
Expand All @@ -116,7 +122,11 @@ export class OrderRemoveHandler extends Handler {
if (
orderRemove.reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED
) {
stats.increment(`${config.SERVICE_NAME}.order_remove_reason_indexer_expired`, 1);
stats.increment(
`${config.SERVICE_NAME}.order_remove_reason_indexer_expired`,
1,
{ instance: getInstanceId() },
);
logger.info({
at: 'OrderRemoveHandler#handle',
message: 'Order was expired by Indexer',
Expand Down Expand Up @@ -449,7 +459,11 @@ export class OrderRemoveHandler extends Handler {
this.generateTimingStatsOptions('find_order_for_indexer_expired_expiry_verification'),
);
if (redisOrder === null) {
stats.increment(`${config.SERVICE_NAME}.indexer_expired_order_not_found`, 1);
stats.increment(
`${config.SERVICE_NAME}.indexer_expired_order_not_found`,
1,
{ instance: getInstanceId() },
);
logger.info({
at: 'orderRemoveHandler#isOrderExpired',
message: 'Could not find order for Indexer-expired expiry verification',
Expand All @@ -473,7 +487,11 @@ export class OrderRemoveHandler extends Handler {

// We know the order is short-term, so the goodTilBlock must exist.
if (order.goodTilBlock! >= +block.blockHeight) {
stats.increment(`${config.SERVICE_NAME}.indexer_expired_order_is_not_expired`, 1);
stats.increment(
`${config.SERVICE_NAME}.indexer_expired_order_is_not_expired`,
1,
{ instance: getInstanceId() },
);
logger.info({
at: 'orderRemoveHandler#isOrderExpired',
message: 'Indexer marked order that is not yet expired as expired',
Expand Down
20 changes: 17 additions & 3 deletions indexer/services/vulcan/src/handlers/order-update-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
logger,
getInstanceId,
runFuncWithTimingStat,
stats,
} from '@dydxprotocol-indexer/base';
Expand Down Expand Up @@ -102,6 +103,7 @@ export class OrderUpdateHandler extends Handler {
1,
{
orderFlags: String(orderFlags),
instance: getInstanceId(),
},
);
return;
Expand All @@ -110,7 +112,11 @@ export class OrderUpdateHandler extends Handler {
const sizeDeltaInQuantums: Big = this.getSizeDeltaInQuantums(updateResult, orderUpdate);

if (sizeDeltaInQuantums.eq(0)) {
stats.increment(`${config.SERVICE_NAME}.order_update_with_zero_delta.count`, 1);
stats.increment(
`${config.SERVICE_NAME}.order_update_with_zero_delta.count`,
1,
{ instance: getInstanceId() },
);
return;
}

Expand Down Expand Up @@ -192,7 +198,11 @@ export class OrderUpdateHandler extends Handler {
message: 'Old total filled quantums of order exceeds order size in quantums.',
updateResult,
});
stats.increment(`${config.SERVICE_NAME}.order_update_old_total_filled_exceeds_size`, 1);
stats.increment(
`${config.SERVICE_NAME}.order_update_old_total_filled_exceeds_size`,
1,
{ instance: getInstanceId() },
);
return Big(updateResult.order!.order!.quantums.toNumber().toString());
}

Expand All @@ -219,7 +229,11 @@ export class OrderUpdateHandler extends Handler {
orderUpdate,
updateResult,
});
stats.increment(`${config.SERVICE_NAME}.order_update_total_filled_exceeds_size`, 1);
stats.increment(
`${config.SERVICE_NAME}.order_update_total_filled_exceeds_size`,
1,
{ instance: getInstanceId() },
);

return Big(updateResult.order!.order!.quantums.toNumber().toString());
}
Expand Down
16 changes: 15 additions & 1 deletion indexer/services/vulcan/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { logger, startBugsnag, wrapBackgroundTask } from '@dydxprotocol-indexer/base';
import {
logger, getInstanceId, startBugsnag, setInstanceId, wrapBackgroundTask,
} from '@dydxprotocol-indexer/base';
import { stopConsumer, startConsumer } from '@dydxprotocol-indexer/kafka';
import { blockHeightRefresher, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres';

Expand All @@ -18,6 +20,18 @@ async function startService(): Promise<void> {

startBugsnag();

logger.info({
at: 'index#start',
message: 'Getting instance id...',
});

await setInstanceId();

logger.info({
at: 'index#start',
message: `Got instance id ${getInstanceId()}.`,
});

// Initialize PerpetualMarkets cache
await Promise.all([
blockHeightRefresher.updateBlockHeight(),
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/vulcan/src/lib/on-batch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import { getInstanceId, logger, stats } from '@dydxprotocol-indexer/base';
import {
Batch,
EachBatchPayload,
Expand All @@ -13,7 +13,7 @@ export async function onBatch(
const batch: Batch = payload.batch;
const topic: string = batch.topic;
const partition: string = batch.partition.toString();
const metricTags: Record<string, string> = { topic, partition };
const metricTags: Record<string, string> = { topic, partition, instance: getInstanceId() };
if (batch.isEmpty()) {
logger.error({
at: 'on-batch#onBatch',
Expand Down
17 changes: 15 additions & 2 deletions indexer/services/vulcan/src/lib/on-message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
getInstanceId,
logger,
stats,
ParseMessageError,
Expand Down Expand Up @@ -42,9 +43,17 @@ function getMessageType(update: OffChainUpdateV1): string {
}

export async function onMessage(message: KafkaMessage): Promise<void> {
stats.increment(`${config.SERVICE_NAME}.received_kafka_message`, 1);
stats.increment(
`${config.SERVICE_NAME}.received_kafka_message`,
1,
{ instance: getInstanceId() },
);
if (!message || !message.value || !message.timestamp) {
stats.increment(`${config.SERVICE_NAME}.empty_kafka_message`, 1);
stats.increment(
`${config.SERVICE_NAME}.empty_kafka_message`,
1,
{ instance: getInstanceId() },
);
logger.error({
at: 'onMessage#onMessage',
message: 'Empty message',
Expand All @@ -59,6 +68,7 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
STATS_NO_SAMPLING,
{
topic: KafkaTopics.TO_VULCAN,
instance: getInstanceId(),
},
);

Expand All @@ -71,6 +81,7 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
{
topic: KafkaTopics.TO_VULCAN,
event_type: String(message.headers?.event_type),
instance: getInstanceId(),
},
);
}
Expand Down Expand Up @@ -130,6 +141,7 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
{
topic: KafkaTopics.TO_VULCAN,
event_type: String(headers?.event_type),
instance: getInstanceId(),
},
);
}
Expand Down Expand Up @@ -164,6 +176,7 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
{
success: success.toString(),
messageType: getMessageType(update),
instance: getInstanceId(),
},
);
}
Expand Down
5 changes: 3 additions & 2 deletions indexer/services/vulcan/src/lib/send-message-helper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {
logger, stats, STATS_NO_SAMPLING, wrapBackgroundTask,
getInstanceId, logger, stats, STATS_NO_SAMPLING, wrapBackgroundTask,
} from '@dydxprotocol-indexer/base';
import { producer } from '@dydxprotocol-indexer/kafka';
import { Message } from 'kafkajs';
Expand Down Expand Up @@ -77,7 +77,7 @@ async function sendMessages(topic: string): Promise<void> {

const messages: Message[] = queuedMessages[topic];
if (messages === undefined || messages.length === 0) {
stats.histogram(sizeStat, 0, STATS_NO_SAMPLING, { topic, success: 'true' });
stats.histogram(sizeStat, 0, STATS_NO_SAMPLING, { topic, success: 'true', instance: getInstanceId() });
return;
}
queuedMessages[topic] = [];
Expand Down Expand Up @@ -107,6 +107,7 @@ async function sendMessages(topic: string): Promise<void> {
const tags: {[name: string]: string} = {
topic,
success: success.toString(),
instance: getInstanceId(),
};
stats.histogram(sizeStat, messages.length, STATS_NO_SAMPLING, tags);
stats.timing(timingStat, Date.now() - start, STATS_NO_SAMPLING, tags);
Expand Down

0 comments on commit 46a1c88

Please sign in to comment.