Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import {
findMemberIdentitiesGroupedByPlatform,
findMemberMergeActions,
} from './activities/dissect-member'
import {
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivityWithWrongMember,
} from './activities/fix-misattributed-activities'
import {
deleteOrganizationIdentity,
findOrganizationIdentity,
Expand Down Expand Up @@ -38,4 +43,7 @@ export {
updateOrganizationIdentity,
deleteOrganizationIdentity,
isLfxMember,
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivityWithWrongMember,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { ActivityRepository } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/activities.repo'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'
import { IActivityCreateData, IMemberIdentity } from '@crowd/types'

import { svc } from '../../main'

export async function findActivitiesWithWrongMembers(tenantId: string, limit: number) {
let activitiesWithWrongMember: IActivityCreateData[] = []

try {
const activityRepo = new ActivityRepository(svc.postgres.reader.connection(), svc.log)
activitiesWithWrongMember = await activityRepo.getActivitiesWithWrongMembers(tenantId, limit)
} catch (err) {
throw new Error(err)
}

return activitiesWithWrongMember
}

export async function findMemberIdentity(username: string, platform: string, tenantId: string) {
let memberIdentity: IMemberIdentity

try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
memberIdentity = await memberRepo.findMemberIdentity(username, platform, tenantId)
} catch (err) {
throw new Error(err)
}

return memberIdentity
}

export async function updateActivityWithWrongMember(activityId: string, correctMemberId: string) {
try {
const activityRepo = new ActivityRepository(svc.postgres.writer.connection(), svc.log)
await activityRepo.updateActivityWithWrongMember(activityId, correctMemberId)
} catch (err) {
throw new Error(err)
}
}
5 changes: 4 additions & 1 deletion services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ export interface IDissectMemberArgs {
forceSplitAllIdentities?: boolean
}

export interface IFixOrgIdentitiesWithWrongUrlsArgs {
export interface tenantIdAndTestRun {
tenantId: string
testRun?: boolean
}

export type IFixMisattributedActivitiesArgs = tenantIdAndTestRun
export type IFixOrgIdentitiesWithWrongUrlsArgs = tenantIdAndTestRun
2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
import { fixMisattributedActivities } from './workflows/fixMisattributedActivities'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'

export {
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization,
dissectMember,
fixOrgIdentitiesWithWrongUrls,
fixMisattributedActivities,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities'
import { IFixMisattributedActivitiesArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '10 minute',
retry: { maximumAttempts: 3 },
})

export async function fixMisattributedActivities(
args: IFixMisattributedActivitiesArgs,
): Promise<void> {
const PROCESS_ACTIVITIES_PER_RUN = args.testRun ? 10 : 1000

if (args.testRun) {
console.log(`Running in test mode with limit 10!`)
}

const tenantId = args.tenantId

const activitiesWithWrongMember = await activity.findActivitiesWithWrongMembers(
tenantId,
PROCESS_ACTIVITIES_PER_RUN,
)

if (!activitiesWithWrongMember.length) {
console.log(`No activities found with misattributed members!`)
return
}

for (const a of activitiesWithWrongMember) {
const memberIdentity = await activity.findMemberIdentity(a.username, a.platform, tenantId)
if (memberIdentity) {
await activity.updateActivityWithWrongMember(a.id, memberIdentity.memberId)
}
}

if (!args.testRun) {
await continueAsNew<typeof fixMisattributedActivities>({
testRun: args.testRun,
tenantId: args.tenantId,
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IActivityCreateData } from '@crowd/types'

export class ActivityRepository {
constructor(
private readonly connection: DbConnection | DbTransaction,
private readonly log: Logger,
) {}

async getActivitiesWithWrongMembers(
tenantId: string,
limit = 100,
): Promise<IActivityCreateData[]> {
try {
return await this.connection.query(
`
SELECT
a.id,
a.username,
a.platform
FROM activities a
JOIN "memberIdentities" mi ON a.username = mi.value
AND a.platform = mi.platform
AND mi.type = 'username'
AND mi."verified" = true
AND a."tenantId" = mi."tenantId"
WHERE a."memberId" <> mi."memberId"
AND a."tenantId" = $(tenantId)
LIMIT $(limit)
`,
{
tenantId,
limit,
},
)
} catch (err) {
this.log.error('Error while finding activities!', err)
throw new Error(err)
}
}

async updateActivityWithWrongMember(activityId: string, correctMemberId: string): Promise<void> {
try {
await this.connection.none(
`
UPDATE activities
SET "memberId" = $(correctMemberId)
WHERE id = $(activityId)
`,
{
correctMemberId,
activityId,
},
)
} catch (err) {
this.log.error('Error while updating activities!', err)
throw new Error(err)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IMember } from '@crowd/types'
import { IMember, IMemberIdentity } from '@crowd/types'

import { IFindMemberIdentitiesGroupedByPlatformResult, ISimilarMember } from './types'

Expand Down Expand Up @@ -159,6 +159,36 @@ class MemberRepository {

return member
}

async findMemberIdentity(
username: string,
platform: string,
tenantId: string,
type = 'username',
): Promise<IMemberIdentity | null> {
let memberIdentity: IMemberIdentity

try {
memberIdentity = await this.connection.oneOrNone(
`
select * from "memberIdentities" where value = $(username) and platform = $(platform)
and type = $(type) and verified = true and "tenantId" = $(tenantId)
`,
{
username,
platform,
type,
tenantId,
},
)
} catch (err) {
this.log.error('Error while finding member identity!', err)

throw new Error(err)
}

return memberIdentity
}
}

export default MemberRepository
Loading