Skip to content

Commit

Permalink
Create consumer using rack id (#2352) (#2393)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0a3b3b8)
  • Loading branch information
roy-dydx authored and mergify[bot] committed Sep 30, 2024
1 parent 0502f73 commit 1c24f1c
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 46 deletions.
4 changes: 2 additions & 2 deletions indexer/packages/kafka/__tests__/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src';
describe.skip('consumer', () => {
beforeAll(async () => {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);
await consumer.subscribe({ topic: TO_ENDER_TOPIC });
await consumer!.subscribe({ topic: TO_ENDER_TOPIC });
await startConsumer();
});

Expand Down
75 changes: 42 additions & 33 deletions indexer/packages/kafka/src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
getAvailabilityZoneId,
logger,
} from '@dydxprotocol-indexer/base';
import {
Expand All @@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME;
const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : '';
const groupId: string = `${groupIdPrefix}${groupIdSuffix}`;

export const consumer: Consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
});
// As a hack, we made this mutable since CommonJS doesn't support top level await.
// Top level await would needed to fetch the az id (used as rack id).
// eslint-disable-next-line import/no-mutable-exports
export let consumer: Consumer | undefined;

// List of functions to run per message consumed.
let onMessageFunction: (topic: string, message: KafkaMessage) => Promise<void>;
Expand Down Expand Up @@ -51,38 +47,51 @@ export function updateOnBatchFunction(
// Whether the consumer is stopped.
let stopped: boolean = false;

consumer.on('consumer.disconnect', async () => {
export async function stopConsumer(): Promise<void> {
logger.info({
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
at: 'kafka-consumer#stop',
message: 'Stopping kafka consumer',
groupId,
});

if (!stopped) {
await consumer.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
stopped = true;
await consumer!.disconnect();
}

export async function initConsumer(): Promise<void> {
consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
rackId: await getAvailabilityZoneId(),
});

consumer!.on('consumer.disconnect', async () => {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
groupId,
});
}
});

export async function stopConsumer(): Promise<void> {
logger.info({
at: 'kafka-consumer#stop',
message: 'Stopping kafka consumer',
groupId,
if (!stopped) {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}
});

stopped = true;
await consumer.disconnect();
}

export async function startConsumer(batchProcessing: boolean = false): Promise<void> {
Expand All @@ -104,7 +113,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise<v
};
}

await consumer.run(consumerRunConfig);
await consumer!.run(consumerRunConfig);

logger.info({
at: 'consumers#connect',
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/ender/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction,
consumer, initConsumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
initConsumer(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// Need to set fromBeginning to true, so when ender restarts, it will consume all messages
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/scripts/src/print-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function seek(offset: bigint): void {
offset: offset.toString(),
});

consumer.seek({
consumer!.seek({
topic: TO_ENDER_TOPIC,
partition: 0,
offset: offset.toString(),
Expand All @@ -57,11 +57,11 @@ export function seek(offset: bigint): void {

export async function connect(height: number): Promise<void> {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
fromBeginning: true,
});
Expand Down
5 changes: 3 additions & 2 deletions indexer/services/socks/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ import { logger } from '@dydxprotocol-indexer/base';
import {
WebsocketTopics,
consumer,
initConsumer,
stopConsumer,
} from '@dydxprotocol-indexer/kafka';

export async function connect(): Promise<void> {
await consumer.connect();
await initConsumer();

logger.info({
at: 'kafka-controller#connect',
message: 'Connected to Kafka',
});

await consumer.subscribe({ topics: Object.values(WebsocketTopics) });
await consumer!.subscribe({ topics: Object.values(WebsocketTopics) });
}

export async function disconnect(): Promise<void> {
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction,
consumer, initConsumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

Expand All @@ -10,11 +10,11 @@ import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
initConsumer(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: KafkaTopics.TO_VULCAN,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// fromBeginning is by default set to false, so vulcan will only consume messages produced
Expand Down

0 comments on commit 1c24f1c

Please sign in to comment.