Skip to content
Merged
10 changes: 6 additions & 4 deletions services/apps/merge_suggestions_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ import {
getMembers,
getMembersForLLMConsumption,
getRawMemberMergeSuggestions,
removeRawMemberMergeSuggestions,
removeMemberMergeSuggestion,
updateMemberMergeSuggestionsLastGeneratedAt,
} from './activities/memberMergeSuggestions'
import {
addOrganizationSuggestionToNoMerge,
addOrganizationToMerge,
findTenantsLatestOrganizationSuggestionGeneratedAt,
getOrganizationMergeSuggestions,
getOrganizations,
getOrganizationsForLLMConsumption,
getRawOrganizationMergeSuggestions,
removeRawOrganizationMergeSuggestions,
removeOrganizationMergeSuggestions,
updateOrganizationMergeSuggestionsLastGeneratedAt,
} from './activities/organizationMergeSuggestions'

Expand All @@ -42,10 +43,11 @@ export {
getMembersForLLMConsumption,
getOrganizationsForLLMConsumption,
getRawOrganizationMergeSuggestions,
removeRawOrganizationMergeSuggestions,
removeOrganizationMergeSuggestions,
getRawMemberMergeSuggestions,
removeRawMemberMergeSuggestions,
removeMemberMergeSuggestion,
saveLLMVerdict,
mergeMembers,
mergeOrganizations,
addOrganizationSuggestionToNoMerge,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { addMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
import { MemberField, queryMembers } from '@crowd/data-access-layer/src/members'
import MemberMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/memberMergeSuggestions.repo'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
Expand Down Expand Up @@ -351,10 +352,23 @@ export async function getRawMemberMergeSuggestions(
return memberMergeSuggestionsRepo.getRawMemberSuggestions(similarityFilter, limit)
}

export async function removeRawMemberMergeSuggestions(suggestion: string[]): Promise<void> {
export async function removeMemberMergeSuggestion(
suggestion: string[],
table: MemberMergeSuggestionTable,
): Promise<void> {
const memberMergeSuggestionsRepo = new MemberMergeSuggestionsRepository(
svc.postgres.writer.connection(),
svc.log,
)
await memberMergeSuggestionsRepo.removeRawMemberSuggestions(suggestion)
await memberMergeSuggestionsRepo.removeMemberMergeSuggestion(suggestion, table)
}

export async function addMemberSuggestionToNoMerge(suggestion: string[]): Promise<void> {
if (suggestion.length !== 2) {
svc.log.debug(`Suggestions array must have two ids!`)
return
}
const qx = pgpQx(svc.postgres.writer.connection())

await addMemberNoMerge(qx, suggestion[0], suggestion[1])
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { OrganizationField, findOrgById, queryOrgs } from '@crowd/data-access-layer'
import { hasLfxMembership } from '@crowd/data-access-layer/src/lfx_memberships'
import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo'
import { addOrgNoMerge } from '@crowd/data-access-layer/src/org_merge'
import { fetchOrgIdentities, findOrgAttributes } from '@crowd/data-access-layer/src/organizations'
import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { buildFullOrgForMergeSuggestions } from '@crowd/opensearch'
Expand Down Expand Up @@ -428,10 +429,23 @@ export async function getRawOrganizationMergeSuggestions(
return suggestions
}

export async function removeRawOrganizationMergeSuggestions(suggestion: string[]): Promise<void> {
export async function removeOrganizationMergeSuggestions(
suggestion: string[],
table: OrganizationMergeSuggestionTable,
): Promise<void> {
const organizationMergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository(
svc.postgres.writer.connection(),
svc.log,
)
await organizationMergeSuggestionsRepo.removeRawOrganizationMergeSuggestions(suggestion)
await organizationMergeSuggestionsRepo.removeOrganizationMergeSuggestions(suggestion, table)
}

export async function addOrganizationSuggestionToNoMerge(suggestion: string[]): Promise<void> {
if (suggestion.length !== 2) {
svc.log.debug(`Suggestions array must have two ids!`)
return
}
const qx = pgpQx(svc.postgres.writer.connection())

await addOrgNoMerge(qx, suggestion[0], suggestion[1])
Comment on lines +447 to +454
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling and logging

The function could be improved in several ways:

  1. Throw an error instead of silently returning on invalid input
  2. Add error handling for database operations
  3. Add logging for successful operations
  4. Add type validation for array elements

Consider this implementation:

 export async function addOrganizationSuggestionToNoMerge(suggestion: string[]): Promise<void> {
   if (suggestion.length !== 2) {
-    svc.log.debug(`Suggestions array must have two ids!`)
-    return
+    throw new Error('Suggestions array must have exactly two organization IDs')
   }
+
+  if (!suggestion.every(id => typeof id === 'string' && id.length > 0)) {
+    throw new Error('All organization IDs must be non-empty strings')
+  }
+
   const qx = pgpQx(svc.postgres.writer.connection())
 
-  await addOrgNoMerge(qx, suggestion[0], suggestion[1])
+  try {
+    await addOrgNoMerge(qx, suggestion[0], suggestion[1])
+    svc.log.info(
+      { primaryId: suggestion[0], secondaryId: suggestion[1] },
+      'Successfully added organization pair to no-merge list'
+    )
+  } catch (error) {
+    svc.log.error(
+      { error, primaryId: suggestion[0], secondaryId: suggestion[1] },
+      'Failed to add organization pair to no-merge list'
+    )
+    throw error
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export async function addOrganizationSuggestionToNoMerge(suggestion: string[]): Promise<void> {
if (suggestion.length !== 2) {
svc.log.debug(`Suggestions array must have two ids!`)
return
}
const qx = pgpQx(svc.postgres.writer.connection())
await addOrgNoMerge(qx, suggestion[0], suggestion[1])
export async function addOrganizationSuggestionToNoMerge(suggestion: string[]): Promise<void> {
if (suggestion.length !== 2) {
throw new Error('Suggestions array must have exactly two organization IDs')
}
if (!suggestion.every(id => typeof id === 'string' && id.length > 0)) {
throw new Error('All organization IDs must be non-empty strings')
}
const qx = pgpQx(svc.postgres.writer.connection())
try {
await addOrgNoMerge(qx, suggestion[0], suggestion[1])
svc.log.info(
{ primaryId: suggestion[0], secondaryId: suggestion[1] },
'Successfully added organization pair to no-merge list'
)
} catch (error) {
svc.log.error(
{ error, primaryId: suggestion[0], secondaryId: suggestion[1] },
'Failed to add organization pair to no-merge list'
)
throw error
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export async function generateMemberMergeSuggestions(
): Promise<void> {
const PAGE_SIZE = 50
const PARALLEL_SUGGESTION_PROCESSING = 10
const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.5
const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.75

let lastUuid: string = args.lastUuid || null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export async function generateOrganizationMergeSuggestions(
): Promise<void> {
const PAGE_SIZE = 25
const PARALLEL_SUGGESTION_PROCESSING = 50
const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.5
const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.75

let lastUuid: string = args.lastUuid || null

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import { LLMSuggestionVerdictType } from '@crowd/types'
import { LLMSuggestionVerdictType, MemberMergeSuggestionTable } from '@crowd/types'

import * as commonActivities from '../activities/common'
import * as memberActivities from '../activities/memberMergeSuggestions'
Expand Down Expand Up @@ -63,7 +63,14 @@ export async function mergeMembersWithLLM(

if (members.length !== 2) {
console.log(`Failed getting members data in suggestion. Skipping suggestion: ${suggestion}`)
await memberActivitiesProxy.removeRawMemberMergeSuggestions(suggestion)
await memberActivitiesProxy.removeMemberMergeSuggestion(
suggestion,
MemberMergeSuggestionTable.MEMBER_TO_MERGE_RAW,
)
await memberActivitiesProxy.removeMemberMergeSuggestion(
suggestion,
MemberMergeSuggestionTable.MEMBER_TO_MERGE_FILTERED,
)
continue
}

Expand All @@ -89,6 +96,19 @@ export async function mergeMembersWithLLM(

if (llmResult.body.content[0].text === 'true') {
await commonActivitiesProxy.mergeMembers(suggestion[0], suggestion[1], args.tenantId)
} else {
console.log(
`LLM doesn't think these members are the same. Removing from suggestions and adding to no merge: ${suggestion[0]} and ${suggestion[1]}!`,
)
await memberActivitiesProxy.removeMemberMergeSuggestion(
suggestion,
MemberMergeSuggestionTable.MEMBER_TO_MERGE_FILTERED,
)
await memberActivitiesProxy.removeMemberMergeSuggestion(
suggestion,
MemberMergeSuggestionTable.MEMBER_TO_MERGE_RAW,
)
await memberActivitiesProxy.addMemberSuggestionToNoMerge(suggestion)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import { LLMSuggestionVerdictType } from '@crowd/types'
import { LLMSuggestionVerdictType, OrganizationMergeSuggestionTable } from '@crowd/types'

import * as commonActivities from '../activities/common'
import * as organizationActivities from '../activities/organizationMergeSuggestions'
Expand Down Expand Up @@ -51,7 +51,14 @@ export async function mergeOrganizationsWithLLM(
console.log(
`Failed getting organization data in suggestion. Skipping suggestion: ${suggestion}`,
)
await organizationActivitiesProxy.removeRawOrganizationMergeSuggestions(suggestion)
await organizationActivitiesProxy.removeOrganizationMergeSuggestions(
suggestion,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
)
await organizationActivitiesProxy.removeOrganizationMergeSuggestions(
suggestion,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
)
continue
}

Expand Down Expand Up @@ -80,6 +87,19 @@ export async function mergeOrganizationsWithLLM(
`LLM verdict says these two orgs are the same. Merging organizations: ${suggestion[0]} and ${suggestion[1]}!`,
)
await commonActivitiesProxy.mergeOrganizations(suggestion[0], suggestion[1], args.tenantId)
} else {
console.log(
`LLM doesn't think these orgs are the same. Removing from suggestions and adding to no merge: ${suggestion[0]} and ${suggestion[1]}!`,
)
await organizationActivitiesProxy.removeOrganizationMergeSuggestions(
suggestion,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
)
await organizationActivitiesProxy.removeOrganizationMergeSuggestions(
suggestion,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
)
await organizationActivitiesProxy.addOrganizationSuggestionToNoMerge(suggestion)
}
}

Expand Down
1 change: 1 addition & 0 deletions services/libs/data-access-layer/src/member_merge/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export async function addMemberNoMerge(
`
INSERT INTO "memberNoMerge" ("memberId", "noMergeId", "createdAt", "updatedAt")
VALUES ($(memberId), $(noMergeId), $(createdAt), $(updatedAt))
on conflict ("memberId", "noMergeId") do nothing
`,
{
memberId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,12 @@ class MemberMergeSuggestionsRepository {
return results.map((r) => [r.memberId, r.toMergeId])
}

async removeRawMemberSuggestions(suggestion: string[]): Promise<void> {
async removeMemberMergeSuggestion(
suggestion: string[],
table: MemberMergeSuggestionTable,
): Promise<void> {
const query = `
delete from "memberToMergeRaw"
delete from "${table}"
where
("memberId" = $(memberId) and "toMergeId" = $(toMergeId))
or
Expand All @@ -301,7 +304,7 @@ class MemberMergeSuggestionsRepository {
try {
await this.connection.none(query, replacements)
} catch (error) {
this.log.error('Error removing raw member suggestions', error)
this.log.error(`Error removing member suggestions from ${table}`, error)
throw error
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,12 @@ class OrganizationMergeSuggestionsRepository {
return results.map((r) => [r.organizationId, r.toMergeId])
}

async removeRawOrganizationMergeSuggestions(suggestion: string[]): Promise<void> {
async removeOrganizationMergeSuggestions(
suggestion: string[],
table: OrganizationMergeSuggestionTable,
): Promise<void> {
const query = `
delete from "organizationToMergeRaw"
delete from "${table}"
where
("organizationId" = $(organizationId) and "toMergeId" = $(toMergeId))
or
Expand All @@ -318,7 +321,7 @@ class OrganizationMergeSuggestionsRepository {
try {
await this.connection.none(query, replacements)
} catch (error) {
this.log.error('Error removing raw organization suggestions', error)
this.log.error(`Error removing organization suggestions rom ${table}!`, error)
throw error
}
}
Expand Down
21 changes: 21 additions & 0 deletions services/libs/data-access-layer/src/org_merge/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,24 @@ export async function findOrgNoMergeIds(

return rows.map((row: { noMergeId: string }) => row.noMergeId)
}

export async function addOrgNoMerge(
qx: QueryExecutor,
organizationId: string,
noMergeId: string,
): Promise<void> {
const currentTime = new Date()
await qx.result(
`
INSERT INTO "organizationNoMerge" ("organizationId", "noMergeId", "createdAt", "updatedAt")
VALUES ($(organizationId), $(noMergeId), $(createdAt), $(updatedAt))
on conflict ("organizationId", "noMergeId") do nothing
`,
{
organizationId,
noMergeId,
createdAt: currentTime,
updatedAt: currentTime,
},
)
}
Loading