diff --git a/packages/destination-actions/src/destinations/kafka/constants.ts b/packages/destination-actions/src/destinations/kafka/constants.ts index 2fa9fe172d..c1ea41dbcb 100644 --- a/packages/destination-actions/src/destinations/kafka/constants.ts +++ b/packages/destination-actions/src/destinations/kafka/constants.ts @@ -2,4 +2,6 @@ export const PRODUCER_TTL_MS = Number(process.env.KAFKA_PRODUCER_TTL_MS) || 0.5 export const PRODUCER_REQUEST_TIMEOUT_MS = Number(process.env.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS) || 10 * 1000 // defaults to 10 seconds -export const FLAGON_NAME = 'actions-kafka-optimize-connection' \ No newline at end of file +export const FLAGON_NAME = 'actions-kafka-optimize-connection' + +export const CONNECTIONS_CACHE_SIZE = 500 diff --git a/packages/destination-actions/src/destinations/kafka/send/index.ts b/packages/destination-actions/src/destinations/kafka/send/index.ts index 190e8a0320..01d7cad1f7 100644 --- a/packages/destination-actions/src/destinations/kafka/send/index.ts +++ b/packages/destination-actions/src/destinations/kafka/send/index.ts @@ -65,11 +65,11 @@ const action: ActionDefinition = { return getTopics(settings) } }, - perform: async (_request, { settings, payload, features, statsContext }) => { - await sendData(settings, [payload], features, statsContext) + perform: async (_request, { settings, payload, features, statsContext, subscriptionMetadata }) => { + await sendData(settings, [payload], features, statsContext, subscriptionMetadata) }, - performBatch: async (_request, { settings, payload, features, statsContext }) => { - await sendData(settings, payload, features, statsContext) + performBatch: async (_request, { settings, payload, features, statsContext, subscriptionMetadata }) => { + await sendData(settings, payload, features, statsContext, subscriptionMetadata) } } diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index 78c0ccab59..62c8598bf2 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -3,8 +3,9 @@ import { DynamicFieldResponse, IntegrationError, Features } from '@segment/actio import type { Settings } from './generated-types' import type { Payload } from './send/generated-types' import { DEFAULT_PARTITIONER, Message, TopicMessages, SSLConfig, CachedProducer } from './types' -import { PRODUCER_REQUEST_TIMEOUT_MS, PRODUCER_TTL_MS, FLAGON_NAME } from './constants' -import { StatsContext } from '@segment/actions-core/destination-kit' +import { PRODUCER_REQUEST_TIMEOUT_MS, PRODUCER_TTL_MS, FLAGON_NAME, CONNECTIONS_CACHE_SIZE } from './constants' +import { StatsContext, SubscriptionMetadata } from '@segment/actions-core/destination-kit' +import { LRUCache } from 'lru-cache' export const producersByConfig: Record = {} @@ -177,11 +178,50 @@ export const getOrCreateProducer = async ( return producer } +const connectionsCache = new LRUCache({ + max: CONNECTIONS_CACHE_SIZE, + ttl: 500, + dispose: (value, _key, _reason) => { + if (value) { + void value.disconnect().then(() => console.log('Kafka producer disconnected from cache eviction')) + } + } +}) + +// const kafkaProducerCache = new Map() + +export const getOrCreateProducerLRU = async ( + settings: Settings, + statsContext: StatsContext | undefined, + subscriptionMetadata: SubscriptionMetadata | undefined +): Promise => { + const key = subscriptionMetadata?.destinationConfigId ?? '' + const cachedProducer = connectionsCache.get(key) + // const cached = kafkaProducerCache.get(key) + + statsContext?.statsClient?.incr('kafka_connection_cache_size', connectionsCache.size, statsContext?.tags) + + if (cachedProducer) { + statsContext?.statsClient?.incr('kafka_connection_reused', 1, statsContext?.tags) + await cachedProducer.connect() // this is idempotent, so is safe + return cachedProducer + } + + const kafka = getKafka(settings) + const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }) + await producer.connect() + statsContext?.statsClient?.incr('kafka_connection_opened', 1, statsContext?.tags) + // kafkaProducerCache.set(key, producer) + connectionsCache.set(key, producer) + return producer +} + export const sendData = async ( settings: Settings, payload: Payload[], features: Features | undefined, - statsContext: StatsContext | undefined + statsContext: StatsContext | undefined, + subscriptionMetadata?: SubscriptionMetadata ) => { validate(settings) @@ -218,7 +258,7 @@ export const sendData = async ( let producer: Producer if (features && features[FLAGON_NAME]) { - producer = await getOrCreateProducer(settings, statsContext) + producer = await getOrCreateProducerLRU(settings, statsContext, subscriptionMetadata) } else { producer = getProducer(settings) await producer.connect() @@ -236,12 +276,7 @@ export const sendData = async ( } } - if (features && features[FLAGON_NAME]) { - const key = serializeKafkaConfig(settings) - if (producersByConfig[key]) { - producersByConfig[key].lastUsed = Date.now() - } - } else { + if (!features || !features?.[FLAGON_NAME]) { await producer.disconnect() } }