Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import {
findMemberIdentitiesGroupedByPlatform,
findMemberMergeActions,
} from './activities/dissect-member'
import {
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivities,
} from './activities/fix-misattributed-activities'
import {
deleteOrganizationIdentity,
findOrganizationIdentity,
Expand Down Expand Up @@ -38,4 +43,7 @@ export {
updateOrganizationIdentity,
deleteOrganizationIdentity,
isLfxMember,
findActivitiesWithWrongMembers,
findMemberIdentity,
updateActivities,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import {
ActivityRepository,
IActivityWithWrongMember,
} from '@crowd/data-access-layer/src/old/apps/script_executor_worker/activities.repo'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'
import { IMemberIdentity } from '@crowd/types'

import { svc } from '../../main'

export async function findActivitiesWithWrongMembers(limit: number) {
let activitiesWithWrongMember: IActivityWithWrongMember[] = []

try {
const activityRepo = new ActivityRepository(svc.postgres.reader.connection(), svc.log)
activitiesWithWrongMember = await activityRepo.getActivitiesWithWrongMembers(limit)
} catch (err) {
throw new Error(err)
}

return activitiesWithWrongMember
}

export async function findMemberIdentity(username: string, platform: string) {
let memberIdentity: IMemberIdentity

try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
memberIdentity = await memberRepo.findMemberIdentity(username, platform)
} catch (err) {
throw new Error(err)
}

return memberIdentity
}

export async function updateActivities(
username: string,
platform: string,
correctMemberId: string,
) {
try {
const activityRepo = new ActivityRepository(svc.postgres.writer.connection(), svc.log)
await activityRepo.updateActivities(username, platform, correctMemberId)
} catch (err) {
throw new Error(err)
}
}
4 changes: 4 additions & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs {
tenantId: string
testRun?: boolean
}

export interface IFixMisattributedActivitiesArgs {
testRun?: boolean
}
2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
import { fixMisattributedActivities } from './workflows/fixMisattributedActivities'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'

export {
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization,
dissectMember,
fixOrgIdentitiesWithWrongUrls,
fixMisattributedActivities,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities/fix-misattributed-activities'
import { IFixMisattributedActivitiesArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '3 minute',
retry: { maximumAttempts: 3 },
})

export async function fixMisattributedActivities(
args: IFixMisattributedActivitiesArgs,
): Promise<void> {
const PROCESS_ACTIVITIES_PER_RUN = args.testRun ? 10 : 100

if (args.testRun) {
console.log(`Running in test mode with limit 10!`)
}

const activitiesWithWrongMember = await activity.findActivitiesWithWrongMembers(
PROCESS_ACTIVITIES_PER_RUN,
)

if (!activitiesWithWrongMember.length) {
console.log(`No activities found with misattributed members!`)
return
}

for (const a of activitiesWithWrongMember) {
const memberIdentity = await activity.findMemberIdentity(a.username, a.platform)
console.log('activity with wrong member', a)
console.log('memberIdentity found for username and platform', memberIdentity)
// if (memberIdentity) {
// await activity.updateActivities(a.username, a.platform, memberIdentity.memberId)
// }
}

if (!args.testRun) {
await continueAsNew<typeof fixMisattributedActivities>({
testRun: args.testRun,
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'

export interface IActivityWithWrongMember {
username: string
platform: string
activityCount: number
}

export class ActivityRepository {
constructor(
private readonly connection: DbConnection | DbTransaction,
private readonly log: Logger,
) {}

async getActivitiesWithWrongMembers(limit = 100): Promise<IActivityWithWrongMember[]> {
try {
return await this.connection.query(
`
SELECT
a.username,
a.platform,
COUNT(*) as "activityCount"
FROM activities a
JOIN "memberIdentities" mi ON a.username = mi.value
AND a.platform = mi.platform
AND mi.type = 'username'
AND mi."verified" = true
WHERE a."memberId" <> mi."memberId"
GROUP BY a.username, a.platform
ORDER BY COUNT(*) DESC
LIMIT $(limit)
`,
{
limit,
},
)
} catch (err) {
this.log.error('Error while finding activities!', err)
throw new Error(err)
}
}

async updateActivities(
username: string,
platform: string,
correctMemberId: string,
): Promise<void> {
try {
await this.connection.none(
`
UPDATE activities
SET "memberId" = $(correctMemberId)
WHERE "username" = $(username)
AND platform = $(platform)
`,
{
username,
platform,
correctMemberId,
},
)
} catch (err) {
this.log.error('Error while updating activities!', err)
throw new Error(err)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IMember } from '@crowd/types'
import { IMember, IMemberIdentity } from '@crowd/types'

import { IFindMemberIdentitiesGroupedByPlatformResult, ISimilarMember } from './types'

Expand Down Expand Up @@ -159,6 +159,34 @@ class MemberRepository {

return member
}

async findMemberIdentity(
username: string,
platform: string,
type = 'username',
): Promise<IMemberIdentity | null> {
let memberIdentity: IMemberIdentity

try {
memberIdentity = await this.connection.oneOrNone(
`
select * from "memberIdentities" where value = $(username) and platform = $(platform)
and type = $(type) and verified = true
`,
{
username,
platform,
type,
},
)
} catch (err) {
this.log.error('Error while finding member identity!', err)

throw new Error(err)
}

return memberIdentity
}
}

export default MemberRepository
Loading