diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 5deaa12fa3..e0f394b1a0 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 3, + Number(process.env.WORKER_MAX_CONCURRENCY || 1), parentLog, undefined, undefined, @@ -58,7 +58,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { break case DataSinkWorkerQueueMessageType.CREATE_AND_PROCESS_ACTIVITY_RESULT: { const msg = message as CreateAndProcessActivityResultQueueMessage - await service.createAndProcessActivityResult( + await service.processActivityInMemoryResult( msg.tenantId, msg.segmentId, msg.integrationId, diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index b7ff422358..d4ddaca3e8 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -1,4 +1,4 @@ -import { addSeconds } from '@crowd/common' +import { addSeconds, generateUUIDv1 } from '@crowd/common' import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' import { DbStore } from '@crowd/data-access-layer/src/database' import { IResultData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data' @@ -42,6 +42,7 @@ export default class DataSinkService extends LoggerBase { private async triggerResultError( resultInfo: IResultData, + isCreated: boolean, location: string, message: string, metadata?: unknown, @@ -63,9 +64,15 @@ export default class DataSinkService extends LoggerBase { // delay for #retries * 2 minutes const until = addSeconds(new Date(), (resultInfo.retries + 1) * 2 * 60) this.log.warn({ until: until.toISOString() }, 'Retrying result!') - await this.repo.delayResult(resultInfo.id, until, errorData) + + await this.repo.delayResult( + resultInfo.id, + until, + errorData, + isCreated ? undefined : resultInfo, + ) } else { - await this.repo.markResultError(resultInfo.id, errorData) + await this.repo.markResultError(resultInfo.id, errorData, isCreated ? undefined : resultInfo) } } @@ -98,13 +105,13 @@ export default class DataSinkService extends LoggerBase { } } - public async createAndProcessActivityResult( + public async processActivityInMemoryResult( tenantId: string, segmentId: string, integrationId: string, data: IActivityData, ): Promise { - this.log.info({ tenantId, segmentId }, 'Creating and processing activity result.') + this.log.info({ tenantId, segmentId }, 'Processing in memory activity result.') const payload = { type: IntegrationResultType.ACTIVITY, @@ -112,13 +119,15 @@ export default class DataSinkService extends LoggerBase { segmentId, } - const [integration, resultId] = await Promise.all([ - integrationId ? this.repo.getIntegrationInfo(integrationId) : Promise.resolve(null), - this.repo.createResult(tenantId, integrationId, payload), - ]) + let integration + + if (integrationId) { + integration = await this.repo.getIntegrationInfo(integrationId) + } + const id = generateUUIDv1() const result: IResultData = { - id: resultId, + id, tenantId, integrationId, data: payload, @@ -132,7 +141,7 @@ export default class DataSinkService extends LoggerBase { onboarding: false, } - await this.processResult(resultId, result) + await this.processResult(id, result) } public async processResult(resultId: string, result?: IResultData): Promise { @@ -263,13 +272,18 @@ export default class DataSinkService extends LoggerBase { type: data.type, }, ) - await this.repo.deleteResult(resultId) + + if (!result) { + await this.repo.deleteResult(resultId) + } + return true } catch (err) { this.log.error(err, 'Error processing result.') try { await this.triggerResultError( resultInfo, + result === undefined, 'process-result', 'Error processing result.', undefined, diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts index d54795dfe7..65602e5dc0 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts @@ -1,7 +1,7 @@ import { distinct, singleOrDefault } from '@crowd/common' import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' -import { IIntegrationResult, IntegrationResultState } from '@crowd/types' +import { IntegrationResultState } from '@crowd/types' import { IDelayedResults, IFailedResultData, IIntegrationData, IResultData } from './dataSink.data' @@ -46,28 +46,6 @@ export default class DataSinkRepository extends RepositoryBase { - const results = await this.db().one( - ` - insert into integration.results(state, data, "tenantId", "integrationId") - values($(state), $(data), $(tenantId), $(integrationId)) - returning id; - `, - { - tenantId, - integrationId, - state: IntegrationResultState.PENDING, - data: JSON.stringify(result), - }, - ) - - return results.id - } - public async getOldResultsToProcessForTenant( tenantId: string, limit: number, @@ -147,22 +125,44 @@ export default class DataSinkRepository extends RepositoryBase { - const result = await this.db().result( - `update integration.results - set state = $(state), - "processedAt" = now(), - error = $(error), - "updatedAt" = now() - where id = $(resultId)`, - { - resultId, - state: IntegrationResultState.ERROR, - error: JSON.stringify(error), - }, - ) + public async markResultError( + resultId: string, + error: unknown, + resultToCreate?: IResultData, + ): Promise { + if (resultToCreate) { + const result = await this.db().result( + ` + insert into integration.results(state, data, "tenantId", "integrationId", error) + values($(state), $(data), $(tenantId), $(integrationId), $(error)) + `, + { + tenantId: resultToCreate.tenantId, + integrationId: resultToCreate.integrationId, + state: IntegrationResultState.ERROR, + data: JSON.stringify(resultToCreate.data), + error: JSON.stringify(error), + }, + ) - this.checkUpdateRowCount(result.rowCount, 1) + this.checkUpdateRowCount(result.rowCount, 1) + } else { + const result = await this.db().result( + `update integration.results + set state = $(state), + "processedAt" = now(), + error = $(error), + "updatedAt" = now() + where id = $(resultId)`, + { + resultId, + state: IntegrationResultState.ERROR, + error: JSON.stringify(error), + }, + ) + + this.checkUpdateRowCount(result.rowCount, 1) + } } public async deleteResult(resultId: string): Promise { @@ -266,24 +266,48 @@ export default class DataSinkRepository extends RepositoryBase r.id) } - public async delayResult(resultId: string, until: Date, error: unknown): Promise { - const result = await this.db().result( - `update integration.results - set state = $(state), - error = $(error), - "delayedUntil" = $(until), - retries = coalesce(retries, 0) + 1, - "updatedAt" = now() - where id = $(resultId)`, - { - resultId, - until, - error: JSON.stringify(error), - state: IntegrationResultState.DELAYED, - }, - ) + public async delayResult( + resultId: string, + until: Date, + error: unknown, + resultToCreate?: IResultData, + ): Promise { + if (resultToCreate) { + const result = await this.db().result( + ` + insert into integration.results(state, data, "tenantId", "integrationId", error, retries, "delayedUntil") + values($(state), $(data), $(tenantId), $(integrationId), $(error), $(retries), $(until)) + `, + { + tenantId: resultToCreate.tenantId, + integrationId: resultToCreate.integrationId, + state: IntegrationResultState.DELAYED, + data: JSON.stringify(resultToCreate.data), + retries: 1, + error: JSON.stringify(error), + until: until, + }, + ) + this.checkUpdateRowCount(result.rowCount, 1) + } else { + const result = await this.db().result( + `update integration.results + set state = $(state), + error = $(error), + "delayedUntil" = $(until), + retries = coalesce(retries, 0) + 1, + "updatedAt" = now() + where id = $(resultId)`, + { + resultId, + until, + error: JSON.stringify(error), + state: IntegrationResultState.DELAYED, + }, + ) - this.checkUpdateRowCount(result.rowCount, 1) + this.checkUpdateRowCount(result.rowCount, 1) + } } public async getDelayedResults(limit: number): Promise { diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index 1b74b3da7f..44bd2ec74e 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -42,6 +42,38 @@ export class KafkaQueueService extends LoggerBase implements IQueue { this.reconnectAttempts = new Map() this.consumerStatus = new Map() } + async getQueueMessageCount(conf: IKafkaChannelConfig): Promise { + const groupId = conf.name + const topic = conf.name + + const admin = this.client.admin() + await admin.connect() + + try { + const topicOffsets = await admin.fetchTopicOffsets(topic) + const offsetsResponse = await admin.fetchOffsets({ + groupId: groupId, + topics: [topic], + }) + + const offsets = offsetsResponse[0].partitions + + let totalLeft = 0 + for (const offset of offsets) { + const topicOffset = topicOffsets.find((p) => p.partition === offset.partition) + if (topicOffset.offset !== offset.offset) { + totalLeft += Number(topicOffset.offset) - Number(offset.offset) + } + } + + return totalLeft + } catch (err) { + this.log.error(err, 'Failed to get message count!') + throw err + } finally { + await admin.disconnect() + } + } public async send( channel: IQueueChannel, @@ -294,6 +326,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue { let retries = options?.retry || 0 let healthCheckInterval + let statisticsInterval try { this.started = true @@ -318,6 +351,33 @@ export class KafkaQueueService extends LoggerBase implements IQueue { } }, 10 * 60000) // Check every 10 minutes + let timings = [] + + statisticsInterval = setInterval(async () => { + if (!this.started) { + clearInterval(statisticsInterval) + return + } + + try { + // Reset the timings array and calculate the average processing time + const durations = [...timings] + timings = [] + + // Get the number of messages left in the queue + const count = await this.getQueueMessageCount(queueConf) + + let message = `Topic has ${count} messages left!` + if (durations.length > 0) { + const average = durations.reduce((a, b) => a + b, 0) / durations.length + message += ` In the last minute ${durations.length} messages were processed (${(durations.length / 60.0).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!` + } + this.log.info({ topic: queueConf.name }, message) + } catch (err) { + // do nothing + } + }, 60000) // check every minute + this.log.trace({ topic: queueConf.name }, 'Subscribed to topic! Starting the consmer...') await consumer.run({ @@ -334,10 +394,12 @@ export class KafkaQueueService extends LoggerBase implements IQueue { processMessage(data) .then(() => { const duration = performance.now() - now - this.log.info(`Message processed successfully in ${duration.toFixed(2)}ms!`) + timings.push(duration) + this.log.debug(`Message processed successfully in ${duration.toFixed(2)}ms!`) }) .catch((err) => { const duration = performance.now() - now + timings.push(duration) this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`) }) .finally(() => { @@ -349,6 +411,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue { } catch (e) { this.log.trace({ topic: queueConf.name, error: e }, 'Failed to start the queue!') clearInterval(healthCheckInterval) + clearInterval(statisticsInterval) if (retries < MAX_RETRY_FOR_CONNECTING_CONSUMER) { retries++ this.log.trace({ topic: queueConf.name, retries }, 'Retrying to start the queue...')