diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8e203a7775..38995027b9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1275,6 +1275,9 @@ importers: axios: specifier: ^1.6.8 version: 1.6.8 + csv-parse: + specifier: ^5.5.6 + version: 5.5.6 moment: specifier: ~2.29.4 version: 2.29.4 @@ -9925,7 +9928,7 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) @@ -10169,7 +10172,7 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.572.0(@aws-sdk/client-sts@3.572.0)': + '@aws-sdk/client-sso-oidc@3.572.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 @@ -10212,7 +10215,6 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: - - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso-oidc@3.687.0(@aws-sdk/client-sts@3.687.0)': @@ -10437,7 +10439,7 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11072,7 +11074,7 @@ snapshots: '@aws-sdk/token-providers@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/shared-ini-file-loader': 2.4.0 diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index 8d1806ac97..52303aad0a 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -21,6 +21,7 @@ "@crowd/types": "workspace:*", "@temporalio/workflow": "~1.11.1", "axios": "^1.6.8", + "csv-parse": "^5.5.6", "moment": "~2.29.4", "tsx": "^4.7.1", "typescript": "^5.6.3" diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index 0ec6f3703c..a878b77fd8 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -10,6 +10,13 @@ import { findMemberIdentitiesGroupedByPlatform, findMemberMergeActions, } from './activities/dissect-member' +import { + batchUpdateActivitiesWithWrongMember, + findActivitiesWithWrongMembers, + findMemberIdentity, + updateActivityWithWrongMember, + updateMemberActivitiesUpdatedAt, +} from './activities/fix-misattributed-activities' import { deleteOrganizationIdentity, findOrganizationIdentity, @@ -38,4 +45,9 @@ export { updateOrganizationIdentity, deleteOrganizationIdentity, isLfxMember, + findActivitiesWithWrongMembers, + findMemberIdentity, + updateActivityWithWrongMember, + batchUpdateActivitiesWithWrongMember, + updateMemberActivitiesUpdatedAt, } diff --git a/services/apps/script_executor_worker/src/activities/fix-misattributed-activities/index.ts b/services/apps/script_executor_worker/src/activities/fix-misattributed-activities/index.ts new file mode 100644 index 0000000000..7e922d95e6 --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/fix-misattributed-activities/index.ts @@ -0,0 +1,96 @@ +import { parse } from 'csv-parse/sync' +import * as fs from 'fs' +import * as path from 'path' + +import { ActivityRepository } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/activities.repo' +import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo' +import { IMemberIdentity } from '@crowd/types' + +import { svc } from '../../main' + +// export async function findActivitiesWithWrongMembers(tenantId: string, limit: number) { +// let activitiesWithWrongMember = [] + +// try { +// const activityRepo = new ActivityRepository( +// svc.postgres.reader.connection(), +// svc.log, +// svc.questdbSQL, +// ) +// activitiesWithWrongMember = await activityRepo.getActivitiesWithWrongMembers(tenantId, limit) +// } catch (err) { +// throw new Error(err) +// } + +// return activitiesWithWrongMember +// } + +export async function findMemberIdentity(username: string, platform: string, tenantId: string) { + let memberIdentity: IMemberIdentity + + try { + const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log) + memberIdentity = await memberRepo.findMemberIdentity(username, platform, tenantId) + } catch (err) { + throw new Error(err) + } + + return memberIdentity +} + +export async function updateActivityWithWrongMember(activityId: string, correctMemberId: string) { + try { + const activityRepo = new ActivityRepository( + svc.postgres.writer.connection(), + svc.log, + svc.questdbSQL, + ) + await activityRepo.updateActivityWithWrongMember(activityId, correctMemberId) + } catch (err) { + throw new Error(err) + } +} + +export async function batchUpdateActivitiesWithWrongMember( + wrongMemberId: string, + correctMemberId: string, +) { + try { + const activityRepo = new ActivityRepository( + svc.postgres.writer.connection(), + svc.log, + svc.questdbSQL, + ) + await activityRepo.batchUpdateActivitiesWithWrongMember(wrongMemberId, correctMemberId) + } catch (err) { + throw new Error(err) + } +} + +export async function updateMemberActivitiesUpdatedAt(memberId: string) { + try { + const activityRepo = new ActivityRepository( + svc.postgres.writer.connection(), + svc.log, + svc.questdbSQL, + ) + await activityRepo.updateMemberActivitiesUpdatedAt(memberId) + } catch (err) { + throw new Error(err) + } +} + +export async function findActivitiesWithWrongMembers(): Promise< + Array<{ + wrongMemberId: string + correctMemberId: string + activitiesCount: number + }> +> { + const csvFilePath = path.join(process.cwd(), 'misattributed_activities.csv') + const fileContent = fs.readFileSync(csvFilePath, 'utf-8') + return parse(fileContent, { + columns: true, + skip_empty_lines: true, + }) +} diff --git a/services/apps/script_executor_worker/src/main.ts b/services/apps/script_executor_worker/src/main.ts index 8ae0530519..cf70f25450 100644 --- a/services/apps/script_executor_worker/src/main.ts +++ b/services/apps/script_executor_worker/src/main.ts @@ -10,7 +10,7 @@ const config: Config = { enabled: true, }, questdb: { - enabled: false, + enabled: true, }, redis: { enabled: true, diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 5447633312..3af8577ef8 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -22,7 +22,14 @@ export interface IDissectMemberArgs { forceSplitAllIdentities?: boolean } -export interface IFixOrgIdentitiesWithWrongUrlsArgs { +export interface tenantIdAndTestRun { tenantId: string testRun?: boolean } + +export type IFixOrgIdentitiesWithWrongUrlsArgs = tenantIdAndTestRun + +export interface IFixMisattributedActivitiesArgs { + startIndex?: number + testRun?: boolean +} diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index 783ed8e00c..e0d919fe83 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -1,6 +1,7 @@ import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' +import { fixMisattributedActivities } from './workflows/fixMisattributedActivities' import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' export { @@ -8,4 +9,5 @@ export { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization, dissectMember, fixOrgIdentitiesWithWrongUrls, + fixMisattributedActivities, } diff --git a/services/apps/script_executor_worker/src/workflows/fixMisattributedActivities.ts b/services/apps/script_executor_worker/src/workflows/fixMisattributedActivities.ts new file mode 100644 index 0000000000..d5c1f44d07 --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/fixMisattributedActivities.ts @@ -0,0 +1,58 @@ +import { proxyActivities } from '@temporalio/workflow' + +import * as activities from '../activities' +import { IFixMisattributedActivitiesArgs } from '../types' + +const activity = proxyActivities({ + startToCloseTimeout: '60 minutes', +}) + +export async function fixMisattributedActivities( + args: IFixMisattributedActivitiesArgs, +): Promise { + // Read CSV file + const records = await activity.findActivitiesWithWrongMembers() + + if (!records.length) { + console.log(`No activities found in the CSV file!`) + return + } + + const startIndex = Number(args.startIndex) + + if (!startIndex) { + console.log('something wrong with startIndex') + return + } + + console.log(`Starting at record ${startIndex}`) + + // Skip records that were already processed + const remainingRecords = records.slice(startIndex) + + if (!remainingRecords.length) { + console.log(`No remaining records to process after skipping ${startIndex} records!`) + return + } + + let processedMemberCount = 0 + const totalRecords = remainingRecords.length + + // Process each record from CSV + for (const record of remainingRecords) { + console.log(`Updating ${record.correctMemberId} member activities updatedAt`) + + await activity.updateMemberActivitiesUpdatedAt(record.correctMemberId) + + processedMemberCount++ + + if (args.testRun && processedMemberCount >= 10) { + console.log('Test run complete!') + break + } + + console.log(`Processed ${processedMemberCount}/${totalRecords} members in the CSV file!`) + } + + console.log('Completed processing all members!') +} diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/activities.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/activities.repo.ts new file mode 100644 index 0000000000..89abf38ecb --- /dev/null +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/activities.repo.ts @@ -0,0 +1,106 @@ +import { DbConnOrTx, DbConnection, DbTransaction } from '@crowd/database' +import { Logger } from '@crowd/logging' + +import { updateActivities } from '../../../activities/update' + +export class ActivityRepository { + constructor( + private readonly connection: DbConnection | DbTransaction, + private readonly log: Logger, + private readonly questdbSQL: DbConnOrTx, + ) {} + + async getActivitiesWithWrongMembers( + tenantId: string, + limit = 100, + ): Promise<{ wrongMemberId: string; correctMemberId: string }[]> { + try { + return await this.connection.query( + ` + SELECT + a."memberId" as "wrongMemberId", + mi."memberId" as "correctMemberId" + FROM activities a + JOIN "memberIdentities" mi ON a.username = mi.value + AND a.platform = mi.platform + AND mi.type = 'username' + AND mi."verified" = true + AND a."tenantId" = mi."tenantId" + WHERE a."memberId" <> mi."memberId" + AND a."tenantId" = $(tenantId) + GROUP BY a."memberId", mi."memberId" + LIMIT $(limit) + `, + { + tenantId, + limit, + }, + ) + } catch (err) { + this.log.error('Error while finding activities!', err) + throw new Error(err) + } + } + + async updateActivityWithWrongMember(activityId: string, correctMemberId: string): Promise { + try { + // Update the activity in pgsql to persist progress + await this.connection.none( + ` + UPDATE activities + SET "memberId" = $(correctMemberId) + WHERE id = $(activityId) + `, + { + correctMemberId, + activityId, + }, + ) + + // Update the activity in QuestDB + await updateActivities( + this.questdbSQL, + async () => ({ memberId: correctMemberId }), + 'id = $(activityId)', + { + activityId, + }, + ) + } catch (err) { + this.log.error('Error while updating activities!', err) + throw new Error(err) + } + } + + async batchUpdateActivitiesWithWrongMember( + wrongMemberId: string, + correctMemberId: string, + ): Promise { + try { + await this.questdbSQL.none( + 'UPDATE "activities" SET "memberId" = $(correctMemberId) WHERE "memberId" = $(wrongMemberId)', + { + wrongMemberId, + correctMemberId, + }, + ) + } catch (err) { + this.log.error('Error while batch updating activities!', err) + throw new Error(err) + } + } + + async updateMemberActivitiesUpdatedAt(memberId: string): Promise { + try { + await this.questdbSQL.none( + 'UPDATE "activities" SET "updatedAt" = now() WHERE "memberId" = $(memberId)', + { + memberId, + }, + ) + } catch (err) { + this.log.error('Error while updating activities!', err) + throw new Error(err) + } + } +} diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts index 126c76219d..9eca599e7d 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts @@ -1,6 +1,6 @@ import { DbConnection, DbTransaction } from '@crowd/database' import { Logger } from '@crowd/logging' -import { IMember } from '@crowd/types' +import { IMember, IMemberIdentity } from '@crowd/types' import { IFindMemberIdentitiesGroupedByPlatformResult, ISimilarMember } from './types' @@ -159,6 +159,36 @@ class MemberRepository { return member } + + async findMemberIdentity( + username: string, + platform: string, + tenantId: string, + type = 'username', + ): Promise { + let memberIdentity: IMemberIdentity + + try { + memberIdentity = await this.connection.oneOrNone( + ` + select * from "memberIdentities" where value = $(username) and platform = $(platform) + and type = $(type) and verified = true and "tenantId" = $(tenantId) + `, + { + username, + platform, + type, + tenantId, + }, + ) + } catch (err) { + this.log.error('Error while finding member identity!', err) + + throw new Error(err) + } + + return memberIdentity + } } export default MemberRepository