From 1c24f1c8ca2928a3c6a8937df8d9433433d54c35 Mon Sep 17 00:00:00 2001 From: roy-dydx <133032749+roy-dydx@users.noreply.github.com> Date: Fri, 27 Sep 2024 15:18:43 -0400 Subject: [PATCH] Create consumer using rack id (#2352) (#2393) (cherry picked from commit 0a3b3b8d6031a7a44192b082871267aa6f1616a6) --- .../packages/kafka/__tests__/consumer.test.ts | 4 +- indexer/packages/kafka/src/consumer.ts | 75 +++++++++++-------- .../src/helpers/kafka/kafka-controller.ts | 6 +- indexer/services/scripts/src/print-block.ts | 6 +- .../src/helpers/kafka/kafka-controller.ts | 5 +- .../src/helpers/kafka/kafka-controller.ts | 6 +- 6 files changed, 56 insertions(+), 46 deletions(-) diff --git a/indexer/packages/kafka/__tests__/consumer.test.ts b/indexer/packages/kafka/__tests__/consumer.test.ts index de801b2dfe..e05d67d5e3 100644 --- a/indexer/packages/kafka/__tests__/consumer.test.ts +++ b/indexer/packages/kafka/__tests__/consumer.test.ts @@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src'; describe.skip('consumer', () => { beforeAll(async () => { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ topic: TO_ENDER_TOPIC }); + await consumer!.subscribe({ topic: TO_ENDER_TOPIC }); await startConsumer(); }); diff --git a/indexer/packages/kafka/src/consumer.ts b/indexer/packages/kafka/src/consumer.ts index 93c41d12c5..82c26cf1e0 100644 --- a/indexer/packages/kafka/src/consumer.ts +++ b/indexer/packages/kafka/src/consumer.ts @@ -1,4 +1,5 @@ import { + getAvailabilityZoneId, logger, } from '@dydxprotocol-indexer/base'; import { @@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME; const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : ''; const groupId: string = `${groupIdPrefix}${groupIdSuffix}`; -export const consumer: Consumer = kafka.consumer({ - groupId, - sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, - rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, - heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, - maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, - readUncommitted: false, - maxBytes: 4194304, // 4MB -}); +// As a hack, we made this mutable since CommonJS doesn't support top level await. +// Top level await would needed to fetch the az id (used as rack id). +// eslint-disable-next-line import/no-mutable-exports +export let consumer: Consumer | undefined; // List of functions to run per message consumed. let onMessageFunction: (topic: string, message: KafkaMessage) => Promise; @@ -51,38 +47,51 @@ export function updateOnBatchFunction( // Whether the consumer is stopped. let stopped: boolean = false; -consumer.on('consumer.disconnect', async () => { +export async function stopConsumer(): Promise { logger.info({ - at: 'consumers#disconnect', - message: 'Kafka consumer disconnected', + at: 'kafka-consumer#stop', + message: 'Stopping kafka consumer', groupId, }); - if (!stopped) { - await consumer.connect(); - logger.info({ - at: 'kafka-consumer#disconnect', - message: 'Kafka consumer reconnected', - groupId, - }); - } else { + stopped = true; + await consumer!.disconnect(); +} + +export async function initConsumer(): Promise { + consumer = kafka.consumer({ + groupId, + sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, + rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, + heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, + maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, + readUncommitted: false, + maxBytes: 4194304, // 4MB + rackId: await getAvailabilityZoneId(), + }); + + consumer!.on('consumer.disconnect', async () => { logger.info({ - at: 'kafka-consumer#disconnect', - message: 'Not reconnecting since task is shutting down', + at: 'consumers#disconnect', + message: 'Kafka consumer disconnected', groupId, }); - } -}); -export async function stopConsumer(): Promise { - logger.info({ - at: 'kafka-consumer#stop', - message: 'Stopping kafka consumer', - groupId, + if (!stopped) { + await consumer!.connect(); + logger.info({ + at: 'kafka-consumer#disconnect', + message: 'Kafka consumer reconnected', + groupId, + }); + } else { + logger.info({ + at: 'kafka-consumer#disconnect', + message: 'Not reconnecting since task is shutting down', + groupId, + }); + } }); - - stopped = true; - await consumer.disconnect(); } export async function startConsumer(batchProcessing: boolean = false): Promise { @@ -104,7 +113,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise { await Promise.all([ - consumer.connect(), + initConsumer(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: TO_ENDER_TOPIC, // https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning // Need to set fromBeginning to true, so when ender restarts, it will consume all messages diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index 05855229ca..34d32a7728 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -42,7 +42,7 @@ export function seek(offset: bigint): void { offset: offset.toString(), }); - consumer.seek({ + consumer!.seek({ topic: TO_ENDER_TOPIC, partition: 0, offset: offset.toString(), @@ -57,11 +57,11 @@ export function seek(offset: bigint): void { export async function connect(height: number): Promise { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: TO_ENDER_TOPIC, fromBeginning: true, }); diff --git a/indexer/services/socks/src/helpers/kafka/kafka-controller.ts b/indexer/services/socks/src/helpers/kafka/kafka-controller.ts index 03409f2849..247819f8bf 100644 --- a/indexer/services/socks/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/socks/src/helpers/kafka/kafka-controller.ts @@ -2,18 +2,19 @@ import { logger } from '@dydxprotocol-indexer/base'; import { WebsocketTopics, consumer, + initConsumer, stopConsumer, } from '@dydxprotocol-indexer/kafka'; export async function connect(): Promise { - await consumer.connect(); + await initConsumer(); logger.info({ at: 'kafka-controller#connect', message: 'Connected to Kafka', }); - await consumer.subscribe({ topics: Object.values(WebsocketTopics) }); + await consumer!.subscribe({ topics: Object.values(WebsocketTopics) }); } export async function disconnect(): Promise { diff --git a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts index ae2038f0f3..528eb4b851 100644 --- a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts @@ -1,6 +1,6 @@ import { logger } from '@dydxprotocol-indexer/base'; import { - consumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction, + consumer, initConsumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction, } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; @@ -10,11 +10,11 @@ import { onMessage } from '../../lib/on-message'; export async function connect(): Promise { await Promise.all([ - consumer.connect(), + initConsumer(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: KafkaTopics.TO_VULCAN, // https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning // fromBeginning is by default set to false, so vulcan will only consume messages produced