From b6e44c9c0d5b840d4715a5e9a6263bda41cd1f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 10 Jan 2025 18:14:46 +0100 Subject: [PATCH 01/24] log performance --- services/libs/queue/src/vendors/kafka/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index 130d7c5a44..d3704d1819 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -327,7 +327,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue { try { await processMessage(data) const duration = performance.now() - now - this.log.debug(`Message processed successfully in ${duration.toFixed(2)}ms!`) + this.log.info(`Message processed successfully in ${duration.toFixed(2)}ms!`) } catch (err) { const duration = performance.now() - now this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`) From be397bde78c9cfcfc24098889a5e47a43f7d2e6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 10 Jan 2025 22:26:15 +0100 Subject: [PATCH 02/24] log performance --- services/apps/data_sink_worker/src/service/dataSink.service.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index ff9c0b3c77..7a6ea6b1c8 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -117,6 +117,7 @@ export default class DataSinkService extends LoggerBase { if (!resultInfo) { telemetry.increment('data_sync_worker.result_not_found', 1) + this.log.info('Result not found. Skipping...') return false } From fc3891749d63998845ed4286538062702762ddf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 10 Jan 2025 22:45:54 +0100 Subject: [PATCH 03/24] fixes --- .../src/service/dataSink.service.ts | 35 ++++++++++++++++--- .../data_sink_worker/repo/dataSink.data.ts | 14 ++++---- .../data_sink_worker/repo/dataSink.repo.ts | 20 +++++++---- 3 files changed, 50 insertions(+), 19 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 7a6ea6b1c8..b67e6acd77 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -11,6 +11,7 @@ import { IActivityData, IMemberData, IOrganization, + IntegrationResultState, IntegrationResultType, PlatformType, } from '@crowd/types' @@ -101,19 +102,43 @@ export default class DataSinkService extends LoggerBase { ): Promise { this.log.debug({ tenantId, segmentId }, 'Creating and processing activity result.') - const resultId = await this.repo.createResult(tenantId, integrationId, { + const payload = { type: IntegrationResultType.ACTIVITY, data, segmentId, - }) + } + + const [integration, resultId] = await Promise.all([ + integrationId ? this.repo.getIntegrationInfo(integrationId) : Promise.resolve(null), + this.repo.createResult(tenantId, integrationId, payload), + ]) + + const result: IResultData = { + id: resultId, + tenantId, + integrationId, + data: payload, + state: IntegrationResultState.PENDING, + runId: null, + streamId: null, + webhookId: null, + platform: integration ? integration.platform : null, + retries: 0, + delayedUntil: null, + onboarding: false, + } - await this.processResult(resultId) + await this.processResult(resultId, result) } - public async processResult(resultId: string): Promise { + public async processResult(resultId: string, result?: IResultData): Promise { this.log.debug({ resultId }, 'Processing result.') - const resultInfo = await this.repo.getResultInfo(resultId) + let resultInfo = result + + if (!resultInfo) { + resultInfo = await this.repo.getResultInfo(resultId) + } if (!resultInfo) { telemetry.increment('data_sync_worker.result_not_found', 1) diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data.ts index f26b198e75..11245bbc42 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data.ts @@ -5,7 +5,12 @@ import { PlatformType, } from '@crowd/types' -export interface IResultData { +export interface IIntegrationData { + integrationId: string + platform: PlatformType +} + +export interface IResultData extends IIntegrationData { id: string state: IntegrationResultState data: IIntegrationResult @@ -15,13 +20,6 @@ export interface IResultData { webhookId: string | null streamId: string tenantId: string - integrationId: string - platform: PlatformType - - hasSampleData: boolean - plan: string - isTrialPlan: boolean - name: string retries: number delayedUntil: string | null diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts index 316b354e04..d54795dfe7 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts @@ -3,7 +3,7 @@ import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' import { IIntegrationResult, IntegrationResultState } from '@crowd/types' -import { IDelayedResults, IFailedResultData, IResultData } from './dataSink.data' +import { IDelayedResults, IFailedResultData, IIntegrationData, IResultData } from './dataSink.data' export default class DataSinkRepository extends RepositoryBase { constructor(dbStore: DbStore, parentLog: Logger) { @@ -22,14 +22,9 @@ export default class DataSinkRepository extends RepositoryBase { + const result = await this.db().oneOrNone( + `select id as "integrationId", + platform + from integrations where id = $(integrationId)`, + { + integrationId, + }, + ) + + return result + } + public async createResult( tenantId: string, integrationId: string, From 04a48eae9a52b486313dbbf876dcf33201581bab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 10 Jan 2025 22:53:04 +0100 Subject: [PATCH 04/24] fixes --- .../apps/data_sink_worker/src/service/dataSink.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index b67e6acd77..6f555b7626 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -100,7 +100,7 @@ export default class DataSinkService extends LoggerBase { integrationId: string, data: IActivityData, ): Promise { - this.log.debug({ tenantId, segmentId }, 'Creating and processing activity result.') + this.log.info({ tenantId, segmentId }, 'Creating and processing activity result.') const payload = { type: IntegrationResultType.ACTIVITY, @@ -142,7 +142,7 @@ export default class DataSinkService extends LoggerBase { if (!resultInfo) { telemetry.increment('data_sync_worker.result_not_found', 1) - this.log.info('Result not found. Skipping...') + this.log.info({ resultId }, 'Result not found. Skipping...') return false } From 72bd97270c11db795d2b2f1fb04a72ff5c17b646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 10 Jan 2025 23:01:08 +0100 Subject: [PATCH 05/24] fixes --- services/apps/data_sink_worker/src/service/dataSink.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 6f555b7626..719108e9ce 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -142,7 +142,7 @@ export default class DataSinkService extends LoggerBase { if (!resultInfo) { telemetry.increment('data_sync_worker.result_not_found', 1) - this.log.info({ resultId }, 'Result not found. Skipping...') + this.log.info(`Result not found by id "${resultId}". Skipping...`) return false } From 0bae1c8745e1276ff0cb48f63a8ef6427b689480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 10 Jan 2025 23:22:23 +0100 Subject: [PATCH 06/24] fix --- .../src/service/activity.service.ts | 27 ++++--------------- .../data_sink_worker/src/service/common.ts | 6 +++++ .../src/service/dataSink.service.ts | 6 ++++- 3 files changed, 16 insertions(+), 23 deletions(-) create mode 100644 services/apps/data_sink_worker/src/service/common.ts diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 9e4c1740fd..70a2d71eae 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -50,16 +50,10 @@ import { import { TEMPORAL_CONFIG } from '../conf' import { IActivityCreateData, IActivityUpdateData, ISentimentActivityInput } from './activity.data' +import { UnrepeatableError } from './common' import MemberService from './member.service' import MemberAffiliationService from './memberAffiliation.service' -export class SuppressedActivityError extends Error { - constructor(message: string) { - super(message) - this.name = 'SuppressedActivityError' - } -} - export default class ActivityService extends LoggerBase { constructor( private readonly pgStore: DbStore, @@ -459,18 +453,9 @@ export default class ActivityService extends LoggerBase { (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME, ) if (!identity) { - if (platform === PlatformType.JIRA) { - throw new SuppressedActivityError( - `Activity's member does not have an identity for the platform: ${platform}!`, - ) - } else { - this.log.error( - "Activity's member does not have an identity for the platform. Suppressing it!", - ) - throw new Error( - `Activity's member does not have an identity for the platform: ${platform}!`, - ) - } + throw new UnrepeatableError( + `Activity's member does not have an identity for the platform: ${platform}!`, + ) } username = identity.value @@ -1187,9 +1172,7 @@ export default class ActivityService extends LoggerBase { await this.redisClient.sAdd('organizationIdsForAggComputation', organizationId) } } catch (err) { - if (!(err instanceof SuppressedActivityError)) { - this.log.error(err, 'Error while processing an activity!') - } + this.log.error(err, 'Error while processing an activity!') throw err } } diff --git a/services/apps/data_sink_worker/src/service/common.ts b/services/apps/data_sink_worker/src/service/common.ts new file mode 100644 index 0000000000..84007c4f40 --- /dev/null +++ b/services/apps/data_sink_worker/src/service/common.ts @@ -0,0 +1,6 @@ +export class UnrepeatableError extends Error { + constructor(message: string) { + super(message) + this.name = 'SuppressedActivityError' + } +} diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 719108e9ce..b7ff422358 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -19,6 +19,7 @@ import { import { WORKER_SETTINGS } from '../conf' import ActivityService from './activity.service' +import { UnrepeatableError } from './common' import MemberService from './member.service' import { OrganizationService } from './organization.service' @@ -55,7 +56,10 @@ export default class DataSinkService extends LoggerBase { errorString: error ? JSON.stringify(error) : undefined, } - if (resultInfo.retries + 1 <= WORKER_SETTINGS().maxStreamRetries) { + if ( + !(error instanceof UnrepeatableError) && + resultInfo.retries + 1 <= WORKER_SETTINGS().maxStreamRetries + ) { // delay for #retries * 2 minutes const until = addSeconds(new Date(), (resultInfo.retries + 1) * 2 * 60) this.log.warn({ until: until.toISOString() }, 'Retrying result!') From 1ee53a8d95a71beace9237f54c50a5038dc38147 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 10 Jan 2025 23:32:28 +0100 Subject: [PATCH 07/24] remove unused trycatch --- .../src/service/activity.service.ts | 1175 ++++++++--------- 1 file changed, 582 insertions(+), 593 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 70a2d71eae..68cf867b9e 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -435,607 +435,397 @@ export default class ActivityService extends LoggerBase { sourceId: activity.sourceId, }) - try { - this.log.debug({ tenantId, integrationId, platform }, 'Processing activity.') + this.log.debug({ tenantId, integrationId, platform }, 'Processing activity.') + + if (!activity.username && !activity.member) { + this.log.error( + { integrationId, platform, activity }, + 'Activity does not have a username or member.', + ) + throw new Error('Activity does not have a username or member.') + } - if (!activity.username && !activity.member) { - this.log.error( - { integrationId, platform, activity }, - 'Activity does not have a username or member.', + let username = activity.username + if (!username) { + const identity = singleOrDefault( + activity.member.identities, + (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME, + ) + if (!identity) { + throw new UnrepeatableError( + `Activity's member does not have an identity for the platform: ${platform}!`, ) - throw new Error('Activity does not have a username or member.') } - let username = activity.username - if (!username) { - const identity = singleOrDefault( - activity.member.identities, - (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME, - ) - if (!identity) { - throw new UnrepeatableError( - `Activity's member does not have an identity for the platform: ${platform}!`, - ) - } + username = identity.value + } - username = identity.value + let member = activity.member + if (!member) { + member = { + identities: [ + { + platform, + value: username, + type: MemberIdentityType.USERNAME, + verified: true, + }, + ], } + } - let member = activity.member - if (!member) { - member = { - identities: [ - { - platform, - value: username, - type: MemberIdentityType.USERNAME, - verified: true, - }, - ], - } - } + if (!member.attributes) { + member.attributes = {} + } + + let objectMemberUsername = activity.objectMemberUsername + let objectMember = activity.objectMember - if (!member.attributes) { - member.attributes = {} + if (objectMember && !objectMemberUsername) { + const identity = singleOrDefault( + objectMember.identities, + (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME, + ) + if (!identity) { + this.log.error("Activity's object member does not have an identity for the platform.") + throw new Error( + `Activity's object member does not have an identity for the platform: ${platform}!`, + ) } - let objectMemberUsername = activity.objectMemberUsername - let objectMember = activity.objectMember + objectMemberUsername = identity.value + } else if (objectMemberUsername && !objectMember) { + objectMember = { + identities: [ + { + platform, + value: objectMemberUsername, + type: MemberIdentityType.USERNAME, + verified: true, + }, + ], + } + } - if (objectMember && !objectMemberUsername) { - const identity = singleOrDefault( - objectMember.identities, - (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME, - ) - if (!identity) { - this.log.error("Activity's object member does not have an identity for the platform.") - throw new Error( - `Activity's object member does not have an identity for the platform: ${platform}!`, + const repo = new RequestedForErasureMemberIdentitiesRepository(this.pgStore, this.log) + + // check if member or object member have identities that were requested to be erased by the user + if (member && member.identities.length > 0) { + const toErase = await repo.someIdentitiesWereErasedByUserRequest(member.identities) + if (toErase.length > 0) { + // prevent member/activity creation of one of the identities that are marked to be erased are verified + if (toErase.some((i) => i.verified)) { + this.log.warn( + { memberIdentities: member.identities }, + 'Member has identities that were requested to be erased by the user! Skipping activity processing!', ) - } + return + } else { + // we just remove the unverified identities that were marked to be erased and prevent them from being created + member.identities = member.identities.filter((i) => { + if (i.verified) return true - objectMemberUsername = identity.value - } else if (objectMemberUsername && !objectMember) { - objectMember = { - identities: [ - { - platform, - value: objectMemberUsername, - type: MemberIdentityType.USERNAME, - verified: true, - }, - ], - } - } + const maybeToErase = toErase.find( + (e) => e.type === i.type && e.value === i.value && e.platform === i.platform, + ) - const repo = new RequestedForErasureMemberIdentitiesRepository(this.pgStore, this.log) + if (maybeToErase) return false + return true + }) - // check if member or object member have identities that were requested to be erased by the user - if (member && member.identities.length > 0) { - const toErase = await repo.someIdentitiesWereErasedByUserRequest(member.identities) - if (toErase.length > 0) { - // prevent member/activity creation of one of the identities that are marked to be erased are verified - if (toErase.some((i) => i.verified)) { + if (member.identities.filter((i) => i.value).length === 0) { this.log.warn( - { memberIdentities: member.identities }, - 'Member has identities that were requested to be erased by the user! Skipping activity processing!', + 'Member had at least one unverified identity removed as it was requested to be removed! Now there is no identities left - skipping processing!', ) return - } else { - // we just remove the unverified identities that were marked to be erased and prevent them from being created - member.identities = member.identities.filter((i) => { - if (i.verified) return true - - const maybeToErase = toErase.find( - (e) => e.type === i.type && e.value === i.value && e.platform === i.platform, - ) - - if (maybeToErase) return false - return true - }) - - if (member.identities.filter((i) => i.value).length === 0) { - this.log.warn( - 'Member had at least one unverified identity removed as it was requested to be removed! Now there is no identities left - skipping processing!', - ) - return - } } } } + } - if (objectMember && objectMember.identities.length > 0) { - const toErase = await repo.someIdentitiesWereErasedByUserRequest(objectMember.identities) - if (toErase.length > 0) { - // prevent member/activity creation of one of the identities that are marked to be erased are verified - if (toErase.some((i) => i.verified)) { - this.log.warn( - { objectMemberIdentities: objectMember.identities }, - 'Object member has identities that were requested to be erased by the user! Skipping activity processing!', - ) - return - } else { - // we just remove the unverified identities that were marked to be erased and prevent them from being created - objectMember.identities = objectMember.identities.filter((i) => { - if (i.verified) return true + if (objectMember && objectMember.identities.length > 0) { + const toErase = await repo.someIdentitiesWereErasedByUserRequest(objectMember.identities) + if (toErase.length > 0) { + // prevent member/activity creation of one of the identities that are marked to be erased are verified + if (toErase.some((i) => i.verified)) { + this.log.warn( + { objectMemberIdentities: objectMember.identities }, + 'Object member has identities that were requested to be erased by the user! Skipping activity processing!', + ) + return + } else { + // we just remove the unverified identities that were marked to be erased and prevent them from being created + objectMember.identities = objectMember.identities.filter((i) => { + if (i.verified) return true - const maybeToErase = toErase.find( - (e) => e.type === i.type && e.value === i.value && e.platform === i.platform, - ) + const maybeToErase = toErase.find( + (e) => e.type === i.type && e.value === i.value && e.platform === i.platform, + ) - if (maybeToErase) return false - return true - }) + if (maybeToErase) return false + return true + }) - if (objectMember.identities.filter((i) => i.value).length === 0) { - this.log.warn( - 'Object member had at least one unverified identity removed as it was requested to be removed! Now there is no identities left - skipping processing!', - ) - return - } + if (objectMember.identities.filter((i) => i.value).length === 0) { + this.log.warn( + 'Object member had at least one unverified identity removed as it was requested to be removed! Now there is no identities left - skipping processing!', + ) + return } } } + } - let memberId: string - let objectMemberId: string | undefined - let memberIsBot = false - let memberIsTeamMember = false - let segmentId: string - let organizationId: string - - const memberAttValue = (attName: MemberAttributeName, dbMember?: IDbMember): unknown => { - let result: unknown - if (dbMember && dbMember.attributes[attName]) { - // db member already has this attribute - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const att = dbMember.attributes[attName] as any - // if it's manually set we use that - if (att.custom) { - // manually set - result = att.custom + let memberId: string + let objectMemberId: string | undefined + let memberIsBot = false + let memberIsTeamMember = false + let segmentId: string + let organizationId: string + + const memberAttValue = (attName: MemberAttributeName, dbMember?: IDbMember): unknown => { + let result: unknown + if (dbMember && dbMember.attributes[attName]) { + // db member already has this attribute + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const att = dbMember.attributes[attName] as any + // if it's manually set we use that + if (att.custom) { + // manually set + result = att.custom + } else { + // if it's not manually set we check if incoming member data has the attribute set for the platform + if (member.attributes[attName] && member.attributes[attName][platform]) { + result = member.attributes[attName][platform] } else { - // if it's not manually set we check if incoming member data has the attribute set for the platform - if (member.attributes[attName] && member.attributes[attName][platform]) { - result = member.attributes[attName][platform] - } else { - // if none of those work we just use db member attribute default value - result = att.default - } + // if none of those work we just use db member attribute default value + result = att.default } - } else if (member.attributes[attName] && member.attributes[attName][platform]) { - result = member.attributes[attName][platform] } - - return result + } else if (member.attributes[attName] && member.attributes[attName][platform]) { + result = member.attributes[attName][platform] } - await this.pgStore.transactionally(async (txStore) => { - try { - const txMemberRepo = new MemberRepository(txStore, this.log) - const txMemberService = new MemberService( - txStore, - this.searchSyncWorkerEmitter, - this.temporal, - this.redisClient, - this.log, - ) - const txActivityService = new ActivityService( - txStore, - this.qdbStore, - this.searchSyncWorkerEmitter, - this.redisClient, - this.temporal, - this.log, - ) - const txIntegrationRepo = new IntegrationRepository(txStore, this.log) - const txMemberAffiliationService = new MemberAffiliationService(txStore, this.log) - const txGithubReposRepo = new GithubReposRepository(txStore, this.log) - const txGitlabReposRepo = new GitlabReposRepository(txStore, this.log) - - segmentId = providedSegmentId - if (!segmentId) { - const dbIntegration = await txIntegrationRepo.findById(integrationId) - const repoSegmentId = await txGithubReposRepo.findSegmentForRepo( - tenantId, - activity.channel, - ) - const gitlabRepoSegmentId = await txGitlabReposRepo.findSegmentForRepo( - tenantId, - activity.channel, - ) - - if (platform === PlatformType.GITLAB && gitlabRepoSegmentId) { - segmentId = gitlabRepoSegmentId - } else if (platform === PlatformType.GITHUB && repoSegmentId) { - segmentId = repoSegmentId - } else { - segmentId = dbIntegration.segmentId - } - } + return result + } - // find existing activity - const { - rows: [dbActivity], - } = await queryActivities(this.qdbStore.connection(), { + await this.pgStore.transactionally(async (txStore) => { + try { + const txMemberRepo = new MemberRepository(txStore, this.log) + const txMemberService = new MemberService( + txStore, + this.searchSyncWorkerEmitter, + this.temporal, + this.redisClient, + this.log, + ) + const txActivityService = new ActivityService( + txStore, + this.qdbStore, + this.searchSyncWorkerEmitter, + this.redisClient, + this.temporal, + this.log, + ) + const txIntegrationRepo = new IntegrationRepository(txStore, this.log) + const txMemberAffiliationService = new MemberAffiliationService(txStore, this.log) + const txGithubReposRepo = new GithubReposRepository(txStore, this.log) + const txGitlabReposRepo = new GitlabReposRepository(txStore, this.log) + + segmentId = providedSegmentId + if (!segmentId) { + const dbIntegration = await txIntegrationRepo.findById(integrationId) + const repoSegmentId = await txGithubReposRepo.findSegmentForRepo( tenantId, - segmentIds: [segmentId], - filter: { - and: [ - { timestamp: { eq: activity.timestamp } }, - { sourceId: { eq: activity.sourceId } }, - { platform: { eq: platform } }, - { type: { eq: activity.type } }, - { channel: { eq: activity.channel } }, - ], - }, - limit: 1, - }) + activity.channel, + ) + const gitlabRepoSegmentId = await txGitlabReposRepo.findSegmentForRepo( + tenantId, + activity.channel, + ) - if (dbActivity && dbActivity?.deletedAt) { - // we found an existing activity but it's deleted - nothing to do here - this.log.trace( - { activityId: dbActivity.id }, - 'Found existing activity but it is deleted, nothing to do here.', - ) - return + if (platform === PlatformType.GITLAB && gitlabRepoSegmentId) { + segmentId = gitlabRepoSegmentId + } else if (platform === PlatformType.GITHUB && repoSegmentId) { + segmentId = repoSegmentId + } else { + segmentId = dbIntegration.segmentId } + } - let createActivity = false - - if (dbActivity) { - this.log.trace({ activityId: dbActivity.id }, 'Found existing activity. Updating it.') - // process member data - - let dbMember = await txMemberRepo.findMemberByUsername( - tenantId, - segmentId, - platform, - username, - ) - if (dbMember) { - // we found a member for the identity from the activity - this.log.trace({ memberId: dbMember.id }, 'Found existing member.') - - // lets check if it's a match from what we have in the database activity that we got through sourceId - if (dbActivity.memberId !== dbMember.id) { - // the memberId from the dbActivity does not match the one we found from the identity - // we should remove the activity and let it recreate itself with the correct member - // this is probably a legacy problem before we had weak identities - this.log.warn( - { - activityMemberId: dbActivity.memberId, - memberId: dbMember.id, - activityType: activity.type, - }, - 'Exiting activity has a memberId that does not match the memberId for the platform:username identity! Deleting the activity!', - ) - - createActivity = true - } + // find existing activity + const { + rows: [dbActivity], + } = await queryActivities(this.qdbStore.connection(), { + tenantId, + segmentIds: [segmentId], + filter: { + and: [ + { timestamp: { eq: activity.timestamp } }, + { sourceId: { eq: activity.sourceId } }, + { platform: { eq: platform } }, + { type: { eq: activity.type } }, + { channel: { eq: activity.channel } }, + ], + }, + limit: 1, + }) + + if (dbActivity && dbActivity?.deletedAt) { + // we found an existing activity but it's deleted - nothing to do here + this.log.trace( + { activityId: dbActivity.id }, + 'Found existing activity but it is deleted, nothing to do here.', + ) + return + } - // update the member - await txMemberService.update( - dbMember.id, - tenantId, - onboarding, - segmentId, - integrationId, - { - attributes: member.attributes, - joinedAt: member.joinedAt - ? new Date(member.joinedAt) - : new Date(activity.timestamp), - identities: member.identities, - organizations: member.organizations, - reach: member.reach, - }, - dbMember, - platform, - false, - ) + let createActivity = false - if (!createActivity) { - // and use it's member id for the new activity - dbActivity.memberId = dbMember.id - } + if (dbActivity) { + this.log.trace({ activityId: dbActivity.id }, 'Found existing activity. Updating it.') + // process member data - memberId = dbMember.id - // determine isBot and isTeamMember - memberIsBot = - (memberAttValue(MemberAttributeName.IS_BOT, dbMember) as boolean) ?? false - memberIsTeamMember = - (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER, dbMember) as boolean) ?? false - } else { - this.log.trace( - 'We did not find a member for the identity provided! Updating the one from db activity.', - ) - // we did not find a member for the identity from the activity - // which is weird since the memberId from the activity points to some member - // that does not have the identity from the new activity - // we should add the activity to the member - // merge member data with the one from the activity and the one from the database - // leave activity.memberId as is - - dbMember = await txMemberRepo.findById(dbActivity.memberId) - await txMemberService.update( - dbMember.id, - tenantId, - onboarding, - segmentId, - integrationId, + let dbMember = await txMemberRepo.findMemberByUsername( + tenantId, + segmentId, + platform, + username, + ) + if (dbMember) { + // we found a member for the identity from the activity + this.log.trace({ memberId: dbMember.id }, 'Found existing member.') + + // lets check if it's a match from what we have in the database activity that we got through sourceId + if (dbActivity.memberId !== dbMember.id) { + // the memberId from the dbActivity does not match the one we found from the identity + // we should remove the activity and let it recreate itself with the correct member + // this is probably a legacy problem before we had weak identities + this.log.warn( { - attributes: member.attributes, - joinedAt: member.joinedAt - ? new Date(member.joinedAt) - : new Date(activity.timestamp), - identities: member.identities, - organizations: member.organizations, - reach: member.reach, + activityMemberId: dbActivity.memberId, + memberId: dbMember.id, + activityType: activity.type, }, - dbMember, - platform, - false, + 'Exiting activity has a memberId that does not match the memberId for the platform:username identity! Deleting the activity!', ) - memberId = dbActivity.memberId - // determine isBot and isTeamMember - memberIsBot = - (memberAttValue(MemberAttributeName.IS_BOT, dbMember) as boolean) ?? false - memberIsTeamMember = - (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER, dbMember) as boolean) ?? false + createActivity = true } - // process object member data - // existing activity has it but now we don't anymore - if (dbActivity.objectMemberId && !objectMember) { - // TODO what to do here? - throw new Error( - `Activity ${dbActivity.id} has an object member but newly generated one does not!`, - ) - } - - if (objectMember) { - if (dbActivity.objectMemberId) { - let dbObjectMember = await txMemberRepo.findMemberByUsername( - tenantId, - segmentId, - platform, - objectMemberUsername, - ) - - if (dbObjectMember) { - // we found an existing object member for the identity from the activity - this.log.trace( - { objectMemberId: dbObjectMember.id }, - 'Found existing object member.', - ) - - // lets check if it's a match from what we have in the database activity that we got through sourceId - if (dbActivity.objectMemberId !== dbObjectMember.id) { - // the memberId from the dbActivity does not match the one we found from the identity - // we should remove the activity and let it recreate itself with the correct member - // this is probably a legacy problem before we had weak identities - this.log.warn( - { - activityObjectMemberId: dbActivity.objectMemberId, - objectMemberId: dbObjectMember.id, - activityType: activity.type, - }, - 'Exiting activity has a objectMemberId that does not match the object member for the platform:username identity! Deleting the activity!', - ) - - createActivity = true - } - - // update the member - await txMemberService.update( - dbObjectMember.id, - tenantId, - onboarding, - segmentId, - integrationId, - { - attributes: objectMember.attributes, - joinedAt: objectMember.joinedAt - ? new Date(objectMember.joinedAt) - : new Date(activity.timestamp), - identities: objectMember.identities, - organizations: objectMember.organizations, - reach: member.reach, - }, - dbObjectMember, - platform, - false, - ) - - if (!createActivity) { - // and use it's member id for the new activity - dbActivity.objectMemberId = dbObjectMember.id - } - - objectMemberId = dbObjectMember.id - } else { - this.log.trace( - 'We did not find a object member for the identity provided! Updating the one from db activity.', - ) - // we did not find a member for the identity from the activity - // which is weird since the memberId from the activity points to some member - // that does not have the identity from the new activity - // we should add the activity to the member - // merge member data with the one from the activity and the one from the database - // leave activity.memberId as is - - dbObjectMember = await txMemberRepo.findById(dbActivity.objectMemberId) - await txMemberService.update( - dbObjectMember.id, - tenantId, - onboarding, - segmentId, - integrationId, - { - attributes: objectMember.attributes, - joinedAt: objectMember.joinedAt - ? new Date(objectMember.joinedAt) - : new Date(activity.timestamp), - identities: objectMember.identities, - organizations: objectMember.organizations, - reach: member.reach, - }, - dbObjectMember, - platform, - false, - ) - - objectMemberId = dbActivity.objectMemberId - } - } - } + // update the member + await txMemberService.update( + dbMember.id, + tenantId, + onboarding, + segmentId, + integrationId, + { + attributes: member.attributes, + joinedAt: member.joinedAt + ? new Date(member.joinedAt) + : new Date(activity.timestamp), + identities: member.identities, + organizations: member.organizations, + reach: member.reach, + }, + dbMember, + platform, + false, + ) if (!createActivity) { - organizationId = await txMemberAffiliationService.findAffiliation( - dbActivity.memberId, - segmentId, - dbActivity.timestamp, - ) - - // just update the activity now - await txActivityService.update( - dbActivity.id, - tenantId, - onboarding, - segmentId, - { - type: activity.type, - isContribution: activity.isContribution, - score: activity.score, - sourceId: activity.sourceId, - sourceParentId: activity.sourceParentId, - memberId: dbActivity.memberId, - username, - objectMemberId, - objectMemberUsername, - attributes: activity.attributes || {}, - body: activity.body, - title: activity.title, - channel: activity.channel, - url: activity.url, - organizationId, - platform: - platform === PlatformType.GITHUB && dbActivity.platform === PlatformType.GIT - ? PlatformType.GITHUB - : (dbActivity.platform as PlatformType), - }, - dbActivity, - { - isBot: memberIsBot ?? false, - isTeamMember: memberIsTeamMember ?? false, - }, - false, - ) + // and use it's member id for the new activity + dbActivity.memberId = dbMember.id } - // release lock for member inside activity exists - this migth be redundant, but just in case + memberId = dbMember.id + // determine isBot and isTeamMember + memberIsBot = (memberAttValue(MemberAttributeName.IS_BOT, dbMember) as boolean) ?? false + memberIsTeamMember = + (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER, dbMember) as boolean) ?? false } else { - this.log.trace('We did not find an existing activity. Creating a new one.') - createActivity = true - - // we don't have the activity yet in the database - // check if we have a member for the identity from the activity - let dbMember = await txMemberRepo.findMemberByUsername( + this.log.trace( + 'We did not find a member for the identity provided! Updating the one from db activity.', + ) + // we did not find a member for the identity from the activity + // which is weird since the memberId from the activity points to some member + // that does not have the identity from the new activity + // we should add the activity to the member + // merge member data with the one from the activity and the one from the database + // leave activity.memberId as is + + dbMember = await txMemberRepo.findById(dbActivity.memberId) + await txMemberService.update( + dbMember.id, tenantId, + onboarding, segmentId, + integrationId, + { + attributes: member.attributes, + joinedAt: member.joinedAt + ? new Date(member.joinedAt) + : new Date(activity.timestamp), + identities: member.identities, + organizations: member.organizations, + reach: member.reach, + }, + dbMember, platform, - username, + false, ) - // try to find a member by email if verified one is available - if (!dbMember) { - const emails = member.identities - .filter((i) => i.verified && i.type === MemberIdentityType.EMAIL) - .map((i) => i.value) - - if (emails.length > 0) { - for (const email of emails) { - dbMember = await txMemberRepo.findMemberByEmail(tenantId, email) - - if (dbMember) { - break - } - } - } - } - - if (dbMember) { - this.log.trace({ memberId: dbMember.id }, 'Found existing member.') - await txMemberService.update( - dbMember.id, - tenantId, - onboarding, - segmentId, - integrationId, - { - attributes: member.attributes, - joinedAt: member.joinedAt - ? new Date(member.joinedAt) - : new Date(activity.timestamp), - identities: member.identities, - organizations: member.organizations, - reach: member.reach, - }, - dbMember, - platform, - false, - ) - memberId = dbMember.id - // determine isBot and isTeamMember - memberIsBot = - (memberAttValue(MemberAttributeName.IS_BOT, dbMember) as boolean) ?? false - memberIsTeamMember = - (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER, dbMember) as boolean) ?? false - } else { - this.log.trace( - 'We did not find a member for the identity provided! Creating a new one.', - ) - memberId = await txMemberService.create( - tenantId, - onboarding, - segmentId, - integrationId, - { - displayName: member.displayName || username, - attributes: member.attributes, - joinedAt: member.joinedAt - ? new Date(member.joinedAt) - : new Date(activity.timestamp), - identities: member.identities, - organizations: member.organizations, - reach: member.reach, - }, - platform, - false, - ) - } + memberId = dbActivity.memberId // determine isBot and isTeamMember - memberIsBot = (memberAttValue(MemberAttributeName.IS_BOT) as boolean) ?? false + memberIsBot = (memberAttValue(MemberAttributeName.IS_BOT, dbMember) as boolean) ?? false memberIsTeamMember = - (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER) as boolean) ?? false + (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER, dbMember) as boolean) ?? false + } - if (objectMember) { - // we don't have the activity yet in the database - // check if we have an object member for the identity from the activity + // process object member data + // existing activity has it but now we don't anymore + if (dbActivity.objectMemberId && !objectMember) { + // TODO what to do here? + throw new Error( + `Activity ${dbActivity.id} has an object member but newly generated one does not!`, + ) + } - const dbObjectMember = await txMemberRepo.findMemberByUsername( + if (objectMember) { + if (dbActivity.objectMemberId) { + let dbObjectMember = await txMemberRepo.findMemberByUsername( tenantId, segmentId, platform, objectMemberUsername, ) + if (dbObjectMember) { + // we found an existing object member for the identity from the activity this.log.trace( { objectMemberId: dbObjectMember.id }, 'Found existing object member.', ) + + // lets check if it's a match from what we have in the database activity that we got through sourceId + if (dbActivity.objectMemberId !== dbObjectMember.id) { + // the memberId from the dbActivity does not match the one we found from the identity + // we should remove the activity and let it recreate itself with the correct member + // this is probably a legacy problem before we had weak identities + this.log.warn( + { + activityObjectMemberId: dbActivity.objectMemberId, + objectMemberId: dbObjectMember.id, + activityType: activity.type, + }, + 'Exiting activity has a objectMemberId that does not match the object member for the platform:username identity! Deleting the activity!', + ) + + createActivity = true + } + + // update the member await txMemberService.update( dbObjectMember.id, tenantId, @@ -1055,18 +845,32 @@ export default class ActivityService extends LoggerBase { platform, false, ) + + if (!createActivity) { + // and use it's member id for the new activity + dbActivity.objectMemberId = dbObjectMember.id + } + objectMemberId = dbObjectMember.id } else { this.log.trace( - 'We did not find a member for the identity provided! Creating a new one.', + 'We did not find a object member for the identity provided! Updating the one from db activity.', ) - objectMemberId = await txMemberService.create( + // we did not find a member for the identity from the activity + // which is weird since the memberId from the activity points to some member + // that does not have the identity from the new activity + // we should add the activity to the member + // merge member data with the one from the activity and the one from the database + // leave activity.memberId as is + + dbObjectMember = await txMemberRepo.findById(dbActivity.objectMemberId) + await txMemberService.update( + dbObjectMember.id, tenantId, onboarding, segmentId, integrationId, { - displayName: objectMember.displayName || username, attributes: objectMember.attributes, joinedAt: objectMember.joinedAt ? new Date(objectMember.joinedAt) @@ -1075,105 +879,290 @@ export default class ActivityService extends LoggerBase { organizations: objectMember.organizations, reach: member.reach, }, + dbObjectMember, platform, false, ) + + objectMemberId = dbActivity.objectMemberId } } } - const activityId = dbActivity?.id ?? generateUUIDv4() - if (createActivity) { + if (!createActivity) { organizationId = await txMemberAffiliationService.findAffiliation( - memberId, + dbActivity.memberId, segmentId, - activity.timestamp, + dbActivity.timestamp, ) - await txActivityService.create( + // just update the activity now + await txActivityService.update( + dbActivity.id, tenantId, + onboarding, segmentId, { - id: activityId, type: activity.type, - platform, - timestamp: new Date(activity.timestamp), - sourceId: activity.sourceId, isContribution: activity.isContribution, score: activity.score, - sourceParentId: - platform === PlatformType.GITHUB && - activity.type === GithubActivityType.AUTHORED_COMMIT && - activity.sourceParentId - ? await findMatchingPullRequestNodeId( - this.qdbStore.connection(), - tenantId, - activity, - ) - : activity.sourceParentId, - memberId, + sourceId: activity.sourceId, + sourceParentId: activity.sourceParentId, + memberId: dbActivity.memberId, username, objectMemberId, objectMemberUsername, - attributes: - platform === PlatformType.GITHUB && - activity.type === GithubActivityType.AUTHORED_COMMIT - ? await this.findMatchingGitActivityAttributes({ - tenantId, - segmentId, - activity, - attributes: activity.attributes || {}, - }) - : activity.attributes || {}, + attributes: activity.attributes || {}, body: activity.body, title: activity.title, channel: activity.channel, url: activity.url, organizationId, + platform: + platform === PlatformType.GITHUB && dbActivity.platform === PlatformType.GIT + ? PlatformType.GITHUB + : (dbActivity.platform as PlatformType), }, + dbActivity, { isBot: memberIsBot ?? false, isTeamMember: memberIsTeamMember ?? false, }, + false, ) } - if (platform === PlatformType.GIT && activity.type === GitActivityType.AUTHORED_COMMIT) { - await this.pushAttributesToMatchingGithubActivity({ tenantId, segmentId, activity }) - } else if ( - platform === PlatformType.GITHUB && - activity.type === GithubActivityType.PULL_REQUEST_OPENED - ) { - await this.pushPRSourceIdToMatchingGithubCommits({ tenantId, activity }) + // release lock for member inside activity exists - this migth be redundant, but just in case + } else { + this.log.trace('We did not find an existing activity. Creating a new one.') + createActivity = true + + // we don't have the activity yet in the database + // check if we have a member for the identity from the activity + let dbMember = await txMemberRepo.findMemberByUsername( + tenantId, + segmentId, + platform, + username, + ) + + // try to find a member by email if verified one is available + if (!dbMember) { + const emails = member.identities + .filter((i) => i.verified && i.type === MemberIdentityType.EMAIL) + .map((i) => i.value) + + if (emails.length > 0) { + for (const email of emails) { + dbMember = await txMemberRepo.findMemberByEmail(tenantId, email) + + if (dbMember) { + break + } + } + } + } + + if (dbMember) { + this.log.trace({ memberId: dbMember.id }, 'Found existing member.') + await txMemberService.update( + dbMember.id, + tenantId, + onboarding, + segmentId, + integrationId, + { + attributes: member.attributes, + joinedAt: member.joinedAt + ? new Date(member.joinedAt) + : new Date(activity.timestamp), + identities: member.identities, + organizations: member.organizations, + reach: member.reach, + }, + dbMember, + platform, + false, + ) + memberId = dbMember.id + // determine isBot and isTeamMember + memberIsBot = (memberAttValue(MemberAttributeName.IS_BOT, dbMember) as boolean) ?? false + memberIsTeamMember = + (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER, dbMember) as boolean) ?? false + } else { + this.log.trace( + 'We did not find a member for the identity provided! Creating a new one.', + ) + memberId = await txMemberService.create( + tenantId, + onboarding, + segmentId, + integrationId, + { + displayName: member.displayName || username, + attributes: member.attributes, + joinedAt: member.joinedAt + ? new Date(member.joinedAt) + : new Date(activity.timestamp), + identities: member.identities, + organizations: member.organizations, + reach: member.reach, + }, + platform, + false, + ) + } + // determine isBot and isTeamMember + memberIsBot = (memberAttValue(MemberAttributeName.IS_BOT) as boolean) ?? false + memberIsTeamMember = + (memberAttValue(MemberAttributeName.IS_TEAM_MEMBER) as boolean) ?? false + + if (objectMember) { + // we don't have the activity yet in the database + // check if we have an object member for the identity from the activity + + const dbObjectMember = await txMemberRepo.findMemberByUsername( + tenantId, + segmentId, + platform, + objectMemberUsername, + ) + if (dbObjectMember) { + this.log.trace({ objectMemberId: dbObjectMember.id }, 'Found existing object member.') + await txMemberService.update( + dbObjectMember.id, + tenantId, + onboarding, + segmentId, + integrationId, + { + attributes: objectMember.attributes, + joinedAt: objectMember.joinedAt + ? new Date(objectMember.joinedAt) + : new Date(activity.timestamp), + identities: objectMember.identities, + organizations: objectMember.organizations, + reach: member.reach, + }, + dbObjectMember, + platform, + false, + ) + objectMemberId = dbObjectMember.id + } else { + this.log.trace( + 'We did not find a member for the identity provided! Creating a new one.', + ) + objectMemberId = await txMemberService.create( + tenantId, + onboarding, + segmentId, + integrationId, + { + displayName: objectMember.displayName || username, + attributes: objectMember.attributes, + joinedAt: objectMember.joinedAt + ? new Date(objectMember.joinedAt) + : new Date(activity.timestamp), + identities: objectMember.identities, + organizations: objectMember.organizations, + reach: member.reach, + }, + platform, + false, + ) + } } - } finally { - // release locks matter what } - }) - if (memberId) { - await this.searchSyncWorkerEmitter.triggerMemberSync( - tenantId, - memberId, - onboarding, - segmentId, - ) - } - if (objectMemberId) { - await this.searchSyncWorkerEmitter.triggerMemberSync( - tenantId, - objectMemberId, - onboarding, - segmentId, - ) - } + const activityId = dbActivity?.id ?? generateUUIDv4() + if (createActivity) { + organizationId = await txMemberAffiliationService.findAffiliation( + memberId, + segmentId, + activity.timestamp, + ) + + await txActivityService.create( + tenantId, + segmentId, + { + id: activityId, + type: activity.type, + platform, + timestamp: new Date(activity.timestamp), + sourceId: activity.sourceId, + isContribution: activity.isContribution, + score: activity.score, + sourceParentId: + platform === PlatformType.GITHUB && + activity.type === GithubActivityType.AUTHORED_COMMIT && + activity.sourceParentId + ? await findMatchingPullRequestNodeId( + this.qdbStore.connection(), + tenantId, + activity, + ) + : activity.sourceParentId, + memberId, + username, + objectMemberId, + objectMemberUsername, + attributes: + platform === PlatformType.GITHUB && + activity.type === GithubActivityType.AUTHORED_COMMIT + ? await this.findMatchingGitActivityAttributes({ + tenantId, + segmentId, + activity, + attributes: activity.attributes || {}, + }) + : activity.attributes || {}, + body: activity.body, + title: activity.title, + channel: activity.channel, + url: activity.url, + organizationId, + }, + { + isBot: memberIsBot ?? false, + isTeamMember: memberIsTeamMember ?? false, + }, + ) + } - if (organizationId) { - await this.redisClient.sAdd('organizationIdsForAggComputation', organizationId) + if (platform === PlatformType.GIT && activity.type === GitActivityType.AUTHORED_COMMIT) { + await this.pushAttributesToMatchingGithubActivity({ tenantId, segmentId, activity }) + } else if ( + platform === PlatformType.GITHUB && + activity.type === GithubActivityType.PULL_REQUEST_OPENED + ) { + await this.pushPRSourceIdToMatchingGithubCommits({ tenantId, activity }) + } + } finally { + // release locks matter what } - } catch (err) { - this.log.error(err, 'Error while processing an activity!') - throw err + }) + + if (memberId) { + await this.searchSyncWorkerEmitter.triggerMemberSync( + tenantId, + memberId, + onboarding, + segmentId, + ) + } + if (objectMemberId) { + await this.searchSyncWorkerEmitter.triggerMemberSync( + tenantId, + objectMemberId, + onboarding, + segmentId, + ) + } + + if (organizationId) { + await this.redisClient.sAdd('organizationIdsForAggComputation', organizationId) } } From a9a3ee062b4503171ec123d784b6b44162e6f702 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sat, 11 Jan 2025 09:24:54 +0100 Subject: [PATCH 08/24] concurrent processing --- .../apps/data_sink_worker/src/queue/index.ts | 2 +- .../data_sink_worker/src/service/common.ts | 2 +- .../libs/queue/src/vendors/kafka/client.ts | 29 +++++++++++++------ 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 8aa8eb4465..45d126e620 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 20, + 5, parentLog, undefined, undefined, diff --git a/services/apps/data_sink_worker/src/service/common.ts b/services/apps/data_sink_worker/src/service/common.ts index 84007c4f40..3b1405776a 100644 --- a/services/apps/data_sink_worker/src/service/common.ts +++ b/services/apps/data_sink_worker/src/service/common.ts @@ -1,6 +1,6 @@ export class UnrepeatableError extends Error { constructor(message: string) { super(message) - this.name = 'SuppressedActivityError' + this.name = 'UnrepeatableError' } } diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index d3704d1819..1b74b3da7f 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -319,19 +319,30 @@ export class KafkaQueueService extends LoggerBase implements IQueue { }, 10 * 60000) // Check every 10 minutes this.log.trace({ topic: queueConf.name }, 'Subscribed to topic! Starting the consmer...') + await consumer.run({ eachMessage: async ({ message }) => { if (message && message.value) { - const data = JSON.parse(message.value.toString()) - const now = performance.now() - try { - await processMessage(data) - const duration = performance.now() - now - this.log.info(`Message processed successfully in ${duration.toFixed(2)}ms!`) - } catch (err) { - const duration = performance.now() - now - this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`) + while (!this.isAvailable(maxConcurrentMessageProcessing)) { + await timeout(10) } + const now = performance.now() + + this.addJob() + const data = JSON.parse(message.value.toString()) + + processMessage(data) + .then(() => { + const duration = performance.now() - now + this.log.info(`Message processed successfully in ${duration.toFixed(2)}ms!`) + }) + .catch((err) => { + const duration = performance.now() - now + this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`) + }) + .finally(() => { + this.removeJob() + }) } }, }) From 71a006bd22c8060a81fce2ea70bf87846c99d673 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sat, 11 Jan 2025 09:38:45 +0100 Subject: [PATCH 09/24] concurrent processing --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 45d126e620..5deaa12fa3 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 5, + 3, parentLog, undefined, undefined, From a868961b6cb95834cd9ecffbef487dd8b988882d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 09:51:40 +0100 Subject: [PATCH 10/24] statistics logging --- .../libs/queue/src/vendors/kafka/client.ts | 67 ++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index 1b74b3da7f..5fadc3b8f3 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -42,6 +42,38 @@ export class KafkaQueueService extends LoggerBase implements IQueue { this.reconnectAttempts = new Map() this.consumerStatus = new Map() } + async getQueueMessageCount(conf: IKafkaChannelConfig): Promise { + const groupId = conf.name + const topic = conf.name + + const admin = this.client.admin() + await admin.connect() + + try { + const topicOffsets = await admin.fetchTopicOffsets(topic) + const offsetsResponse = await admin.fetchOffsets({ + groupId: groupId, + topics: [topic], + }) + + const offsets = offsetsResponse[0].partitions + + let totalLeft = 0 + for (const offset of offsets) { + const topicOffset = topicOffsets.find((p) => p.partition === offset.partition) + if (topicOffset.offset !== offset.offset) { + totalLeft += Number(topicOffset.offset) - Number(offset.offset) + } + } + + return totalLeft + } catch (err) { + this.log.error(err, 'Failed to get message count!') + throw err + } finally { + await admin.disconnect() + } + } public async send( channel: IQueueChannel, @@ -294,6 +326,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue { let retries = options?.retry || 0 let healthCheckInterval + let statisticsInterval try { this.started = true @@ -318,6 +351,35 @@ export class KafkaQueueService extends LoggerBase implements IQueue { } }, 10 * 60000) // Check every 10 minutes + let timings = [] + + statisticsInterval = setInterval(async () => { + if (!this.started) { + clearInterval(statisticsInterval) + return + } + + try { + // Reset the timings array and calculate the average processing time + const durations = [...timings] + timings = [] + + if (durations.length > 0) { + const average = durations.reduce((a, b) => a + b, 0) / durations.length + this.log.info( + { topic: queueConf.name }, + `In the last minute ${durations.length} messages were processed - average processing time: ${average.toFixed(2)}ms!`, + ) + } + + // Get the number of messages left in the queue + const count = await this.getQueueMessageCount(queueConf) + this.log.info({ topic: queueConf.name }, `Topic has ${count} messages left!`) + } catch (err) { + // do nothing + } + }, 60000) // check every minute + this.log.trace({ topic: queueConf.name }, 'Subscribed to topic! Starting the consmer...') await consumer.run({ @@ -334,10 +396,12 @@ export class KafkaQueueService extends LoggerBase implements IQueue { processMessage(data) .then(() => { const duration = performance.now() - now - this.log.info(`Message processed successfully in ${duration.toFixed(2)}ms!`) + timings.push(duration) + this.log.debug(`Message processed successfully in ${duration.toFixed(2)}ms!`) }) .catch((err) => { const duration = performance.now() - now + timings.push(duration) this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`) }) .finally(() => { @@ -349,6 +413,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue { } catch (e) { this.log.trace({ topic: queueConf.name, error: e }, 'Failed to start the queue!') clearInterval(healthCheckInterval) + clearInterval(statisticsInterval) if (retries < MAX_RETRY_FOR_CONNECTING_CONSUMER) { retries++ this.log.trace({ topic: queueConf.name, retries }, 'Retrying to start the queue...') From 5dbe45de7f5c1c1b9cc8dbea9b7626da1507b61c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 10:37:00 +0100 Subject: [PATCH 11/24] statistics logging --- services/libs/queue/src/vendors/kafka/client.ts | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index 5fadc3b8f3..80fd8c76e4 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -364,17 +364,15 @@ export class KafkaQueueService extends LoggerBase implements IQueue { const durations = [...timings] timings = [] + // Get the number of messages left in the queue + const count = await this.getQueueMessageCount(queueConf) + + let message = `Topic has ${count} messages left!` if (durations.length > 0) { const average = durations.reduce((a, b) => a + b, 0) / durations.length - this.log.info( - { topic: queueConf.name }, - `In the last minute ${durations.length} messages were processed - average processing time: ${average.toFixed(2)}ms!`, - ) + message += ` In the last minute ${durations.length} messages were processed - average processing time: ${average.toFixed(2)}ms!` } - - // Get the number of messages left in the queue - const count = await this.getQueueMessageCount(queueConf) - this.log.info({ topic: queueConf.name }, `Topic has ${count} messages left!`) + this.log.info({ topic: queueConf.name }, message) } catch (err) { // do nothing } From 707be16c129ea3f2459afced244bedab48895772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 10:40:36 +0100 Subject: [PATCH 12/24] test with 1 message processing at a time --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 5deaa12fa3..94d143ff68 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 3, + 1, parentLog, undefined, undefined, From 113f52747a5abc87ae1b1a808bc7c9016518693d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 10:48:17 +0100 Subject: [PATCH 13/24] test with 2 message processing at a time --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 94d143ff68..cea052dcb0 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 1, + 2, parentLog, undefined, undefined, From 3af53a8c5cbef68d78c785ad37d03bad48325d4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 10:59:05 +0100 Subject: [PATCH 14/24] test with 3 message processing at a time --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index cea052dcb0..5deaa12fa3 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 2, + 3, parentLog, undefined, undefined, From 60475e17e1c1dea4d77c2783b3431081a050f4b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 11:44:51 +0100 Subject: [PATCH 15/24] test with 2 message processing at a time --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 5deaa12fa3..cea052dcb0 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 3, + 2, parentLog, undefined, undefined, From df65d55749515a830d4534674dc38ecdde01cb20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 12:15:04 +0100 Subject: [PATCH 16/24] optimization --- .../apps/data_sink_worker/src/queue/index.ts | 2 +- .../src/service/dataSink.service.ts | 36 +++-- .../data_sink_worker/repo/dataSink.repo.ts | 132 +++++++++++------- 3 files changed, 104 insertions(+), 66 deletions(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index cea052dcb0..0b63f27c7d 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -58,7 +58,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { break case DataSinkWorkerQueueMessageType.CREATE_AND_PROCESS_ACTIVITY_RESULT: { const msg = message as CreateAndProcessActivityResultQueueMessage - await service.createAndProcessActivityResult( + await service.processActivityInMemoryResult( msg.tenantId, msg.segmentId, msg.integrationId, diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index b7ff422358..0b4c645636 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -1,4 +1,4 @@ -import { addSeconds } from '@crowd/common' +import { addSeconds, generateUUIDv1 } from '@crowd/common' import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' import { DbStore } from '@crowd/data-access-layer/src/database' import { IResultData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data' @@ -42,6 +42,7 @@ export default class DataSinkService extends LoggerBase { private async triggerResultError( resultInfo: IResultData, + isCreated: boolean, location: string, message: string, metadata?: unknown, @@ -63,9 +64,15 @@ export default class DataSinkService extends LoggerBase { // delay for #retries * 2 minutes const until = addSeconds(new Date(), (resultInfo.retries + 1) * 2 * 60) this.log.warn({ until: until.toISOString() }, 'Retrying result!') - await this.repo.delayResult(resultInfo.id, until, errorData) + + await this.repo.delayResult( + resultInfo.id, + until, + errorData, + isCreated ? undefined : resultInfo, + ) } else { - await this.repo.markResultError(resultInfo.id, errorData) + await this.repo.markResultError(resultInfo.id, errorData, isCreated ? undefined : resultInfo) } } @@ -98,7 +105,7 @@ export default class DataSinkService extends LoggerBase { } } - public async createAndProcessActivityResult( + public async processActivityInMemoryResult( tenantId: string, segmentId: string, integrationId: string, @@ -112,13 +119,15 @@ export default class DataSinkService extends LoggerBase { segmentId, } - const [integration, resultId] = await Promise.all([ - integrationId ? this.repo.getIntegrationInfo(integrationId) : Promise.resolve(null), - this.repo.createResult(tenantId, integrationId, payload), - ]) + let integration + + if (integrationId) { + integration = await this.repo.getIntegrationInfo(integrationId) + } + const id = generateUUIDv1() const result: IResultData = { - id: resultId, + id, tenantId, integrationId, data: payload, @@ -132,7 +141,7 @@ export default class DataSinkService extends LoggerBase { onboarding: false, } - await this.processResult(resultId, result) + await this.processResult(id, result) } public async processResult(resultId: string, result?: IResultData): Promise { @@ -263,13 +272,18 @@ export default class DataSinkService extends LoggerBase { type: data.type, }, ) - await this.repo.deleteResult(resultId) + + if (!result) { + await this.repo.deleteResult(resultId) + } + return true } catch (err) { this.log.error(err, 'Error processing result.') try { await this.triggerResultError( resultInfo, + result === undefined, 'process-result', 'Error processing result.', undefined, diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts index d54795dfe7..0e6aeaae1b 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts @@ -46,28 +46,6 @@ export default class DataSinkRepository extends RepositoryBase { - const results = await this.db().one( - ` - insert into integration.results(state, data, "tenantId", "integrationId") - values($(state), $(data), $(tenantId), $(integrationId)) - returning id; - `, - { - tenantId, - integrationId, - state: IntegrationResultState.PENDING, - data: JSON.stringify(result), - }, - ) - - return results.id - } - public async getOldResultsToProcessForTenant( tenantId: string, limit: number, @@ -147,22 +125,44 @@ export default class DataSinkRepository extends RepositoryBase { - const result = await this.db().result( - `update integration.results - set state = $(state), - "processedAt" = now(), - error = $(error), - "updatedAt" = now() - where id = $(resultId)`, - { - resultId, - state: IntegrationResultState.ERROR, - error: JSON.stringify(error), - }, - ) + public async markResultError( + resultId: string, + error: unknown, + resultToCreate?: IResultData, + ): Promise { + if (resultToCreate) { + const result = await this.db().result( + ` + insert into integration.results(state, data, "tenantId", "integrationId", error) + values($(state), $(data), $(tenantId), $(integrationId), $(error)) + `, + { + tenantId: resultToCreate.tenantId, + integrationId: resultToCreate.integrationId, + state: IntegrationResultState.ERROR, + data: JSON.stringify(resultToCreate.data), + error: JSON.stringify(error), + }, + ) - this.checkUpdateRowCount(result.rowCount, 1) + this.checkUpdateRowCount(result.rowCount, 1) + } else { + const result = await this.db().result( + `update integration.results + set state = $(state), + "processedAt" = now(), + error = $(error), + "updatedAt" = now() + where id = $(resultId)`, + { + resultId, + state: IntegrationResultState.ERROR, + error: JSON.stringify(error), + }, + ) + + this.checkUpdateRowCount(result.rowCount, 1) + } } public async deleteResult(resultId: string): Promise { @@ -266,24 +266,48 @@ export default class DataSinkRepository extends RepositoryBase r.id) } - public async delayResult(resultId: string, until: Date, error: unknown): Promise { - const result = await this.db().result( - `update integration.results - set state = $(state), - error = $(error), - "delayedUntil" = $(until), - retries = coalesce(retries, 0) + 1, - "updatedAt" = now() - where id = $(resultId)`, - { - resultId, - until, - error: JSON.stringify(error), - state: IntegrationResultState.DELAYED, - }, - ) + public async delayResult( + resultId: string, + until: Date, + error: unknown, + resultToCreate?: IResultData, + ): Promise { + if (resultToCreate) { + const result = await this.db().result( + ` + insert into integration.results(state, data, "tenantId", "integrationId", error, retries, "delayedUntil") + values($(state), $(data), $(tenantId), $(integrationId), $(error), $(retries), $(until)) + `, + { + tenantId: resultToCreate.tenantId, + integrationId: resultToCreate.integrationId, + state: IntegrationResultState.DELAYED, + data: JSON.stringify(resultToCreate.data), + retries: 1, + error: JSON.stringify(error), + until: until, + }, + ) + this.checkUpdateRowCount(result.rowCount, 1) + } else { + const result = await this.db().result( + `update integration.results + set state = $(state), + error = $(error), + "delayedUntil" = $(until), + retries = coalesce(retries, 0) + 1, + "updatedAt" = now() + where id = $(resultId)`, + { + resultId, + until, + error: JSON.stringify(error), + state: IntegrationResultState.DELAYED, + }, + ) - this.checkUpdateRowCount(result.rowCount, 1) + this.checkUpdateRowCount(result.rowCount, 1) + } } public async getDelayedResults(limit: number): Promise { From aa2d49aa2ec62dfca8fa0eb2286018e8fa04f204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 12:26:06 +0100 Subject: [PATCH 17/24] optimization --- services/apps/data_sink_worker/src/service/dataSink.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 0b4c645636..d4ddaca3e8 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -111,7 +111,7 @@ export default class DataSinkService extends LoggerBase { integrationId: string, data: IActivityData, ): Promise { - this.log.info({ tenantId, segmentId }, 'Creating and processing activity result.') + this.log.info({ tenantId, segmentId }, 'Processing in memory activity result.') const payload = { type: IntegrationResultType.ACTIVITY, From 90de357ac8f9d9c710f41fcd5a6a471192f12edb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 15:02:50 +0100 Subject: [PATCH 18/24] test with 5 --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 0b63f27c7d..b8f99c1a00 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 2, + 5, parentLog, undefined, undefined, From 0f7e36a1a5ba49b09a7a53364f2dfe5e0c74512b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 15:22:23 +0100 Subject: [PATCH 19/24] test with 1 --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index b8f99c1a00..5b86d637d0 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 5, + 1, parentLog, undefined, undefined, From 17c062522d129c9103915c35907c5a24c2f5eadf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 15:33:30 +0100 Subject: [PATCH 20/24] test with 5 --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 5b86d637d0..b8f99c1a00 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 1, + 5, parentLog, undefined, undefined, From 2312a47a88ea445b1bf79f35f5fea3ab0577f718 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Sun, 12 Jan 2025 17:51:54 +0100 Subject: [PATCH 21/24] env var for concurrenty --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index b8f99c1a00..12cfcaa6b8 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - 5, + Number(process.env.DATA_SINK_WORKER_MAX_CONCURRENCY || 1), parentLog, undefined, undefined, From a4310c80c55365548f972e7ee6091afea7cc8da5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 13 Jan 2025 08:43:45 +0100 Subject: [PATCH 22/24] env var for concurrenty --- services/apps/data_sink_worker/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index 12cfcaa6b8..e0f394b1a0 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { level, client, client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), - Number(process.env.DATA_SINK_WORKER_MAX_CONCURRENCY || 1), + Number(process.env.WORKER_MAX_CONCURRENCY || 1), parentLog, undefined, undefined, From 81fea7b149bf665cd1ba1f101aa058ae19639894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 13 Jan 2025 09:49:32 +0100 Subject: [PATCH 23/24] msg per second --- services/libs/queue/src/vendors/kafka/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index 80fd8c76e4..44bd2ec74e 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -370,7 +370,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue { let message = `Topic has ${count} messages left!` if (durations.length > 0) { const average = durations.reduce((a, b) => a + b, 0) / durations.length - message += ` In the last minute ${durations.length} messages were processed - average processing time: ${average.toFixed(2)}ms!` + message += ` In the last minute ${durations.length} messages were processed (${(durations.length / 60.0).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!` } this.log.info({ topic: queueConf.name }, message) } catch (err) { From b809b8f916bf9a70da527a25505ac80c9800e4ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 13 Jan 2025 18:10:43 +0100 Subject: [PATCH 24/24] lint fix --- .../src/old/apps/data_sink_worker/repo/dataSink.repo.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts index 0e6aeaae1b..65602e5dc0 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts @@ -1,7 +1,7 @@ import { distinct, singleOrDefault } from '@crowd/common' import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' -import { IIntegrationResult, IntegrationResultState } from '@crowd/types' +import { IntegrationResultState } from '@crowd/types' import { IDelayedResults, IFailedResultData, IIntegrationData, IResultData } from './dataSink.data'