Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import {
findMemberIdentitiesGroupedByPlatform,
findMemberMergeActions,
} from './activities/dissect-member'
import {
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivityWithWrongMember,
} from './activities/fix-misattributed-activities'
import {
deleteOrganizationIdentity,
findOrganizationIdentity,
Expand Down Expand Up @@ -38,4 +43,7 @@ export {
updateOrganizationIdentity,
deleteOrganizationIdentity,
isLfxMember,
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivityWithWrongMember,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { ActivityRepository } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/activities.repo'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'
import { IActivityCreateData, IMemberIdentity } from '@crowd/types'

import { svc } from '../../main'

export async function findActivitiesWithWrongMembers(tenantId: string, limit: number) {
let activitiesWithWrongMember: IActivityCreateData[] = []

try {
const activityRepo = new ActivityRepository(svc.postgres.reader.connection(), svc.log)
activitiesWithWrongMember = await activityRepo.getActivitiesWithWrongMembers(tenantId, limit)
} catch (err) {
throw new Error(err)
}

return activitiesWithWrongMember
}

export async function findMemberIdentity(username: string, platform: string, tenantId: string) {
let memberIdentity: IMemberIdentity

try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
memberIdentity = await memberRepo.findMemberIdentity(username, platform, tenantId)
} catch (err) {
throw new Error(err)
}

return memberIdentity
}

export async function updateActivityWithWrongMember(
activityId: string,
username: string,
platform: string,
correctMemberId: string,
tenantId: string,
) {
try {
const activityRepo = new ActivityRepository(svc.postgres.writer.connection(), svc.log)
await activityRepo.updateActivityWithWrongMember(
activityId,
username,
platform,
correctMemberId,
tenantId,
)
} catch (err) {
throw new Error(err)
}
}
5 changes: 4 additions & 1 deletion services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ export interface IDissectMemberArgs {
forceSplitAllIdentities?: boolean
}

export interface IFixOrgIdentitiesWithWrongUrlsArgs {
export interface tenantIdAndTestRun {
tenantId: string
testRun?: boolean
}

export type IFixMisattributedActivitiesArgs = tenantIdAndTestRun
export type IFixOrgIdentitiesWithWrongUrlsArgs = tenantIdAndTestRun
2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
import { fixMisattributedActivities } from './workflows/fixMisattributedActivities'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'

export {
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization,
dissectMember,
fixOrgIdentitiesWithWrongUrls,
fixMisattributedActivities,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities'
import { IFixMisattributedActivitiesArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '10 minute',
retry: { maximumAttempts: 3 },
})

export async function fixMisattributedActivities(
args: IFixMisattributedActivitiesArgs,
): Promise<void> {
const PROCESS_ACTIVITIES_PER_RUN = args.testRun ? 10 : 1000

if (args.testRun) {
console.log(`Running in test mode with limit 10!`)
}

const tenantId = args.tenantId

const activitiesWithWrongMember = await activity.findActivitiesWithWrongMembers(
tenantId,
PROCESS_ACTIVITIES_PER_RUN,
)

if (!activitiesWithWrongMember.length) {
console.log(`No activities found with misattributed members!`)
return
}

for (const a of activitiesWithWrongMember) {
const memberIdentity = await activity.findMemberIdentity(a.username, a.platform, tenantId)
if (memberIdentity) {
await activity.updateActivityWithWrongMember(
a.id,
a.username,
a.platform,
memberIdentity.memberId,
tenantId,
)
}
}

if (!args.testRun) {
await continueAsNew<typeof fixMisattributedActivities>({
testRun: args.testRun,
tenantId: args.tenantId,
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IActivityCreateData } from '@crowd/types'

export class ActivityRepository {
constructor(
private readonly connection: DbConnection | DbTransaction,
private readonly log: Logger,
) {}

async getActivitiesWithWrongMembers(
tenantId: string,
limit = 100,
): Promise<IActivityCreateData[]> {
try {
return await this.connection.query(
`
SELECT
a.id,
a.username,
a.platform
FROM activities a
JOIN "memberIdentities" mi ON a.username = mi.value
AND a.platform = mi.platform
AND mi.type = 'username'
AND mi."verified" = true
AND a."tenantId" = mi."tenantId"
WHERE a."memberId" <> mi."memberId"
AND a."tenantId" = $(tenantId)
LIMIT $(limit)
`,
{
tenantId,
limit,
},
)
} catch (err) {
this.log.error('Error while finding activities!', err)
throw new Error(err)
}
}

async updateActivityWithWrongMember(
activityId: string,
username: string,
platform: string,
correctMemberId: string,
tenantId: string,
): Promise<void> {
try {
await this.connection.none(
`
UPDATE activities
SET "memberId" = $(correctMemberId)
WHERE id = $(activityId)
AND "username" = $(username)
AND platform = $(platform)
AND "tenantId" = $(tenantId)
`,
{
username,
platform,
correctMemberId,
tenantId,
activityId,
},
)
} catch (err) {
this.log.error('Error while updating activities!', err)
throw new Error(err)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IMember } from '@crowd/types'
import { IMember, IMemberIdentity } from '@crowd/types'

import { IFindMemberIdentitiesGroupedByPlatformResult, ISimilarMember } from './types'

Expand Down Expand Up @@ -159,6 +159,36 @@ class MemberRepository {

return member
}

async findMemberIdentity(
username: string,
platform: string,
tenantId: string,
type = 'username',
): Promise<IMemberIdentity | null> {
let memberIdentity: IMemberIdentity

try {
memberIdentity = await this.connection.oneOrNone(
`
select * from "memberIdentities" where value = $(username) and platform = $(platform)
and type = $(type) and verified = true and "tenantId" = $(tenantId)
`,
{
username,
platform,
type,
tenantId,
},
)
} catch (err) {
this.log.error('Error while finding member identity!', err)

throw new Error(err)
}

return memberIdentity
}
}

export default MemberRepository
Loading