From 789ce06727ae469fc6bf68f5856dff2e804902f7 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 25 Oct 2024 17:23:10 +0200 Subject: [PATCH 1/8] fix timeouts --- .../src/schedules/getMembersToEnrich.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index af3f20f816..dac2fc7648 100644 --- a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -20,7 +20,8 @@ export const scheduleMembersEnrichment = async () => { type: 'startWorkflow', workflowType: getMembersToEnrich, taskQueue: 'members-enrichment', - workflowExecutionTimeout: '5 minutes', + workflowExecutionTimeout: '5 days', + workflowRunTimeout: '10 minutes', args: [ { afterId: null, From 483ab340d3d9d24ae47073172b78f659fd60777f Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 25 Oct 2024 17:33:56 +0200 Subject: [PATCH 2/8] escape null byte in cache data --- .../src/old/apps/premium/members_enrichment_worker/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts index efbf266f99..14f4eccf9a 100644 --- a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts @@ -1,4 +1,4 @@ -import { generateUUIDv4 } from '@crowd/common' +import { escapeNullByte, generateUUIDv4 } from '@crowd/common' import { DbConnOrTx, DbStore, DbTransaction } from '@crowd/database' import { IAttributes, @@ -475,7 +475,7 @@ export async function insertMemberEnrichmentCacheDb( memberId: string, source: MemberEnrichmentSource, ) { - const dataSanitized = data ? JSON.stringify(data) : null + const dataSanitized = data ? escapeNullByte(JSON.stringify(data)) : null return tx.query( `INSERT INTO "memberEnrichmentCache" ("memberId", "data", "createdAt", "updatedAt", "source") VALUES ($1, $2, NOW(), NOW(), $3);`, @@ -489,7 +489,7 @@ export async function updateMemberEnrichmentCacheDb( memberId: string, source: MemberEnrichmentSource, ) { - const dataSanitized = data ? JSON.stringify(data) : null + const dataSanitized = data ? escapeNullByte(JSON.stringify(data)) : null return tx.query( `UPDATE "memberEnrichmentCache" SET From 5a2f5d53d7f34f82921a57a14e50ea84b00fd2d2 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 25 Oct 2024 17:44:22 +0200 Subject: [PATCH 3/8] remove workflow level timeouts --- .../src/schedules/getMembersToEnrich.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index dac2fc7648..a8ce8afcef 100644 --- a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -20,8 +20,6 @@ export const scheduleMembersEnrichment = async () => { type: 'startWorkflow', workflowType: getMembersToEnrich, taskQueue: 'members-enrichment', - workflowExecutionTimeout: '5 days', - workflowRunTimeout: '10 minutes', args: [ { afterId: null, From 2c81a77812a518ecf0481a3bdd79f44cbf43d4e2 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 25 Oct 2024 20:38:22 +0200 Subject: [PATCH 4/8] use redactNullByte for cache data --- services/libs/common/src/utils.ts | 3 +++ .../src/old/apps/premium/members_enrichment_worker/index.ts | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/services/libs/common/src/utils.ts b/services/libs/common/src/utils.ts index 0450d16f9a..d5a807bdb0 100644 --- a/services/libs/common/src/utils.ts +++ b/services/libs/common/src/utils.ts @@ -69,6 +69,9 @@ export class BatchProcessor { export const escapeNullByte = (str: string | null | undefined): string => str ? str.replace(/\0/g, 'u0000') : str +export const redactNullByte = (str: string | null | undefined): string => + str ? str.replace(/\0/g, '[NULL]') : '' + export const dateEqualityChecker = (a, b) => { if (a instanceof Date) { a = a.toISOString() diff --git a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts index 14f4eccf9a..314dfe89ac 100644 --- a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts @@ -1,4 +1,4 @@ -import { escapeNullByte, generateUUIDv4 } from '@crowd/common' +import { generateUUIDv4, redactNullByte } from '@crowd/common' import { DbConnOrTx, DbStore, DbTransaction } from '@crowd/database' import { IAttributes, @@ -475,7 +475,7 @@ export async function insertMemberEnrichmentCacheDb( memberId: string, source: MemberEnrichmentSource, ) { - const dataSanitized = data ? escapeNullByte(JSON.stringify(data)) : null + const dataSanitized = data ? redactNullByte(JSON.stringify(data)) : null return tx.query( `INSERT INTO "memberEnrichmentCache" ("memberId", "data", "createdAt", "updatedAt", "source") VALUES ($1, $2, NOW(), NOW(), $3);`, @@ -489,7 +489,7 @@ export async function updateMemberEnrichmentCacheDb( memberId: string, source: MemberEnrichmentSource, ) { - const dataSanitized = data ? escapeNullByte(JSON.stringify(data)) : null + const dataSanitized = data ? redactNullByte(JSON.stringify(data)) : null return tx.query( `UPDATE "memberEnrichmentCache" SET From 4c95990d9f85b433b08178eb930b2ea55126c515 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 25 Oct 2024 20:53:57 +0200 Subject: [PATCH 5/8] improve redactNullByte --- services/libs/common/src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/common/src/utils.ts b/services/libs/common/src/utils.ts index d5a807bdb0..b76b2dde6e 100644 --- a/services/libs/common/src/utils.ts +++ b/services/libs/common/src/utils.ts @@ -70,7 +70,7 @@ export const escapeNullByte = (str: string | null | undefined): string => str ? str.replace(/\0/g, 'u0000') : str export const redactNullByte = (str: string | null | undefined): string => - str ? str.replace(/\0/g, '[NULL]') : '' + str ? str.replace(/\\u0000|\0/g, '[NULL]') : '' export const dateEqualityChecker = (a, b) => { if (a instanceof Date) { From e32037e8de01d7c48ad1e2a8586c4fc31c7a1af9 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 25 Oct 2024 21:10:57 +0200 Subject: [PATCH 6/8] retry policy for activities --- .../members_enrichment_worker/src/workflows/enrichMember.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts index 295f77545f..207991cd3b 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts @@ -16,6 +16,12 @@ const { normalizeEnrichmentData, } = proxyActivities({ startToCloseTimeout: '10 seconds', + retry: { + initialInterval: '5s', + backoffCoefficient: 2.0, + maximumInterval: '30s', + maximumAttempts: 4, + }, }) export async function enrichMember( From b563c00b0e9e6b444c0a81d0d025b8c3affaf8d2 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 25 Oct 2024 21:21:40 +0200 Subject: [PATCH 7/8] reduce parallelization to 100 when enriching members --- .../src/workflows/getMembersToEnrich.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts index a12e763daa..07ed561e6c 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts @@ -20,7 +20,7 @@ const { getMembers } = proxyActivities({ }) export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Promise { - const MEMBER_ENRICHMENT_PER_RUN = 300 + const MEMBER_ENRICHMENT_PER_RUN = 100 const afterId = args?.afterId || null const sources = [MemberEnrichmentSource.PROGAI, MemberEnrichmentSource.CLEARBIT] From 2c0f49c060b3778e1bbd0f0f83c201eab8effca9 Mon Sep 17 00:00:00 2001 From: anilb Date: Sat, 26 Oct 2024 13:11:11 +0200 Subject: [PATCH 8/8] add retry to main workflow --- .../src/schedules/getMembersToEnrich.ts | 5 +++++ services/apps/premium/members_enrichment_worker/src/types.ts | 4 ++++ .../src/workflows/getMembersToEnrich.ts | 4 ++-- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index a8ce8afcef..5a69c80633 100644 --- a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -20,6 +20,11 @@ export const scheduleMembersEnrichment = async () => { type: 'startWorkflow', workflowType: getMembersToEnrich, taskQueue: 'members-enrichment', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, args: [ { afterId: null, diff --git a/services/apps/premium/members_enrichment_worker/src/types.ts b/services/apps/premium/members_enrichment_worker/src/types.ts index 5ad9345f6f..1cdd70fcb8 100644 --- a/services/apps/premium/members_enrichment_worker/src/types.ts +++ b/services/apps/premium/members_enrichment_worker/src/types.ts @@ -23,7 +23,11 @@ export type IMemberEnrichmentData = IMemberEnrichmentDataProgAI | IMemberEnrichm export interface IEnrichmentService { source: MemberEnrichmentSource + + // cache rows with older updatedAt than this will be considered obsolete and will be re-enriched cacheObsoleteAfterSeconds: number + + // can the source enrich using this input isEnrichableBySource(input: IEnrichmentSourceInput): boolean // what kind of identities can this source use as input diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts index 07ed561e6c..99527c128a 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts @@ -34,8 +34,8 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr members.map((member) => { return executeChild(enrichMember, { workflowId: 'member-enrichment/' + member.tenantId + '/' + member.id, - cancellationType: ChildWorkflowCancellationType.ABANDON, - parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON, + cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL, workflowExecutionTimeout: '15 minutes', retry: { backoffCoefficient: 2,