diff --git a/services/apps/merge_suggestions_worker/src/activities.ts b/services/apps/merge_suggestions_worker/src/activities.ts index ecd9698c99..b91fc8dc40 100644 --- a/services/apps/merge_suggestions_worker/src/activities.ts +++ b/services/apps/merge_suggestions_worker/src/activities.ts @@ -12,17 +12,18 @@ import { getMembers, getMembersForLLMConsumption, getRawMemberMergeSuggestions, - removeRawMemberMergeSuggestions, + removeMemberMergeSuggestion, updateMemberMergeSuggestionsLastGeneratedAt, } from './activities/memberMergeSuggestions' import { + addOrganizationSuggestionToNoMerge, addOrganizationToMerge, findTenantsLatestOrganizationSuggestionGeneratedAt, getOrganizationMergeSuggestions, getOrganizations, getOrganizationsForLLMConsumption, getRawOrganizationMergeSuggestions, - removeRawOrganizationMergeSuggestions, + removeOrganizationMergeSuggestions, updateOrganizationMergeSuggestionsLastGeneratedAt, } from './activities/organizationMergeSuggestions' @@ -42,10 +43,11 @@ export { getMembersForLLMConsumption, getOrganizationsForLLMConsumption, getRawOrganizationMergeSuggestions, - removeRawOrganizationMergeSuggestions, + removeOrganizationMergeSuggestions, getRawMemberMergeSuggestions, - removeRawMemberMergeSuggestions, + removeMemberMergeSuggestion, saveLLMVerdict, mergeMembers, mergeOrganizations, + addOrganizationSuggestionToNoMerge, } diff --git a/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts index f5962b4931..a2a21a746f 100644 --- a/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts @@ -1,4 +1,5 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ +import { addMemberNoMerge } from '@crowd/data-access-layer/src/member_merge' import { MemberField, queryMembers } from '@crowd/data-access-layer/src/members' import MemberMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/memberMergeSuggestions.repo' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' @@ -351,10 +352,23 @@ export async function getRawMemberMergeSuggestions( return memberMergeSuggestionsRepo.getRawMemberSuggestions(similarityFilter, limit) } -export async function removeRawMemberMergeSuggestions(suggestion: string[]): Promise { +export async function removeMemberMergeSuggestion( + suggestion: string[], + table: MemberMergeSuggestionTable, +): Promise { const memberMergeSuggestionsRepo = new MemberMergeSuggestionsRepository( svc.postgres.writer.connection(), svc.log, ) - await memberMergeSuggestionsRepo.removeRawMemberSuggestions(suggestion) + await memberMergeSuggestionsRepo.removeMemberMergeSuggestion(suggestion, table) +} + +export async function addMemberSuggestionToNoMerge(suggestion: string[]): Promise { + if (suggestion.length !== 2) { + svc.log.debug(`Suggestions array must have two ids!`) + return + } + const qx = pgpQx(svc.postgres.writer.connection()) + + await addMemberNoMerge(qx, suggestion[0], suggestion[1]) } diff --git a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts index a058ca644b..1231cba007 100644 --- a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts @@ -1,6 +1,7 @@ import { OrganizationField, findOrgById, queryOrgs } from '@crowd/data-access-layer' import { hasLfxMembership } from '@crowd/data-access-layer/src/lfx_memberships' import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo' +import { addOrgNoMerge } from '@crowd/data-access-layer/src/org_merge' import { fetchOrgIdentities, findOrgAttributes } from '@crowd/data-access-layer/src/organizations' import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { buildFullOrgForMergeSuggestions } from '@crowd/opensearch' @@ -357,8 +358,12 @@ async function prepareOrg( findOrgAttributes(qx, organizationId), ]) + if (!base) { + return null + } + return { - displayName: base.displayName, + displayName: base?.displayName || '', description: base.description, phoneNumbers: attributes.filter((a) => a.name === 'phoneNumber').map((a) => a.value), logo: base.logo, @@ -389,7 +394,7 @@ export async function getOrganizationsForLLMConsumption( }), ) - return result + return result.filter((r) => r !== null) } export async function getRawOrganizationMergeSuggestions( @@ -428,10 +433,23 @@ export async function getRawOrganizationMergeSuggestions( return suggestions } -export async function removeRawOrganizationMergeSuggestions(suggestion: string[]): Promise { +export async function removeOrganizationMergeSuggestions( + suggestion: string[], + table: OrganizationMergeSuggestionTable, +): Promise { const organizationMergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository( svc.postgres.writer.connection(), svc.log, ) - await organizationMergeSuggestionsRepo.removeRawOrganizationMergeSuggestions(suggestion) + await organizationMergeSuggestionsRepo.removeOrganizationMergeSuggestions(suggestion, table) +} + +export async function addOrganizationSuggestionToNoMerge(suggestion: string[]): Promise { + if (suggestion.length !== 2) { + svc.log.debug(`Suggestions array must have two ids!`) + return + } + const qx = pgpQx(svc.postgres.writer.connection()) + + await addOrgNoMerge(qx, suggestion[0], suggestion[1]) } diff --git a/services/apps/merge_suggestions_worker/src/workflows/generateMemberMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/workflows/generateMemberMergeSuggestions.ts index 5f17a5cf55..0e016b1d4a 100644 --- a/services/apps/merge_suggestions_worker/src/workflows/generateMemberMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/workflows/generateMemberMergeSuggestions.ts @@ -17,7 +17,7 @@ export async function generateMemberMergeSuggestions( ): Promise { const PAGE_SIZE = 50 const PARALLEL_SUGGESTION_PROCESSING = 10 - const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.5 + const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.75 let lastUuid: string = args.lastUuid || null diff --git a/services/apps/merge_suggestions_worker/src/workflows/generateOrganizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/workflows/generateOrganizationMergeSuggestions.ts index 148501b707..82993e50cb 100644 --- a/services/apps/merge_suggestions_worker/src/workflows/generateOrganizationMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/workflows/generateOrganizationMergeSuggestions.ts @@ -17,7 +17,7 @@ export async function generateOrganizationMergeSuggestions( ): Promise { const PAGE_SIZE = 25 const PARALLEL_SUGGESTION_PROCESSING = 50 - const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.5 + const SIMILARITY_CONFIDENCE_SCORE_THRESHOLD = 0.75 let lastUuid: string = args.lastUuid || null diff --git a/services/apps/merge_suggestions_worker/src/workflows/mergeMembersWithLLM.ts b/services/apps/merge_suggestions_worker/src/workflows/mergeMembersWithLLM.ts index f2d0c3719a..65d8b1433a 100644 --- a/services/apps/merge_suggestions_worker/src/workflows/mergeMembersWithLLM.ts +++ b/services/apps/merge_suggestions_worker/src/workflows/mergeMembersWithLLM.ts @@ -1,6 +1,6 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow' -import { LLMSuggestionVerdictType } from '@crowd/types' +import { LLMSuggestionVerdictType, MemberMergeSuggestionTable } from '@crowd/types' import * as commonActivities from '../activities/common' import * as memberActivities from '../activities/memberMergeSuggestions' @@ -63,7 +63,14 @@ export async function mergeMembersWithLLM( if (members.length !== 2) { console.log(`Failed getting members data in suggestion. Skipping suggestion: ${suggestion}`) - await memberActivitiesProxy.removeRawMemberMergeSuggestions(suggestion) + await memberActivitiesProxy.removeMemberMergeSuggestion( + suggestion, + MemberMergeSuggestionTable.MEMBER_TO_MERGE_RAW, + ) + await memberActivitiesProxy.removeMemberMergeSuggestion( + suggestion, + MemberMergeSuggestionTable.MEMBER_TO_MERGE_FILTERED, + ) continue } @@ -89,6 +96,19 @@ export async function mergeMembersWithLLM( if (llmResult.body.content[0].text === 'true') { await commonActivitiesProxy.mergeMembers(suggestion[0], suggestion[1], args.tenantId) + } else { + console.log( + `LLM doesn't think these members are the same. Removing from suggestions and adding to no merge: ${suggestion[0]} and ${suggestion[1]}!`, + ) + await memberActivitiesProxy.removeMemberMergeSuggestion( + suggestion, + MemberMergeSuggestionTable.MEMBER_TO_MERGE_FILTERED, + ) + await memberActivitiesProxy.removeMemberMergeSuggestion( + suggestion, + MemberMergeSuggestionTable.MEMBER_TO_MERGE_RAW, + ) + await memberActivitiesProxy.addMemberSuggestionToNoMerge(suggestion) } } diff --git a/services/apps/merge_suggestions_worker/src/workflows/mergeOrganizationsWithLLM.ts b/services/apps/merge_suggestions_worker/src/workflows/mergeOrganizationsWithLLM.ts index 9267674276..ffe9c75134 100644 --- a/services/apps/merge_suggestions_worker/src/workflows/mergeOrganizationsWithLLM.ts +++ b/services/apps/merge_suggestions_worker/src/workflows/mergeOrganizationsWithLLM.ts @@ -1,6 +1,6 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow' -import { LLMSuggestionVerdictType } from '@crowd/types' +import { LLMSuggestionVerdictType, OrganizationMergeSuggestionTable } from '@crowd/types' import * as commonActivities from '../activities/common' import * as organizationActivities from '../activities/organizationMergeSuggestions' @@ -51,7 +51,14 @@ export async function mergeOrganizationsWithLLM( console.log( `Failed getting organization data in suggestion. Skipping suggestion: ${suggestion}`, ) - await organizationActivitiesProxy.removeRawOrganizationMergeSuggestions(suggestion) + await organizationActivitiesProxy.removeOrganizationMergeSuggestions( + suggestion, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED, + ) + await organizationActivitiesProxy.removeOrganizationMergeSuggestions( + suggestion, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW, + ) continue } @@ -80,6 +87,19 @@ export async function mergeOrganizationsWithLLM( `LLM verdict says these two orgs are the same. Merging organizations: ${suggestion[0]} and ${suggestion[1]}!`, ) await commonActivitiesProxy.mergeOrganizations(suggestion[0], suggestion[1], args.tenantId) + } else { + console.log( + `LLM doesn't think these orgs are the same. Removing from suggestions and adding to no merge: ${suggestion[0]} and ${suggestion[1]}!`, + ) + await organizationActivitiesProxy.removeOrganizationMergeSuggestions( + suggestion, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED, + ) + await organizationActivitiesProxy.removeOrganizationMergeSuggestions( + suggestion, + OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW, + ) + await organizationActivitiesProxy.addOrganizationSuggestionToNoMerge(suggestion) } } diff --git a/services/apps/search_sync_worker/src/bin/sync-all-members.ts b/services/apps/search_sync_worker/src/bin/sync-all-members.ts index d3dfb08f5f..3669243eec 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-members.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-members.ts @@ -30,6 +30,11 @@ setImmediate(async () => { await indexingRepo.deleteIndexedEntities(IndexedEntityType.MEMBER) } + let withAggs = true + if (processArguments.includes('--no-aggs')) { + withAggs = false + } + const repo = new MemberRepository(store, log) const tenantIds = await repo.getTenantIds() @@ -49,7 +54,7 @@ setImmediate(async () => { log.info(`Processing tenant ${i + 1}/${tenantIds.length}`) current += 1 service - .syncTenantMembers(tenantId, 500) + .syncTenantMembers(tenantId, 500, { withAggs }) .then(() => { current-- log.info(`Processed tenant ${i + 1}/${tenantIds.length}`) diff --git a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts index 8674c79c7c..de9ebc24f1 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts @@ -45,6 +45,12 @@ setImmediate(async () => { await indexingRepo.deleteIndexedEntities(IndexedEntityType.ORGANIZATION) } + let withAggs = true + + if (processArguments.includes('--no-aggs')) { + withAggs = false + } + const readStore = new DbStore(log, readHost) const repo = new OrganizationRepository(readStore, log) @@ -69,7 +75,7 @@ setImmediate(async () => { log.info(`Processing tenant ${i + 1}/${tenantIds.length}`) current += 1 service - .syncTenantOrganizations(tenantId, 500) + .syncTenantOrganizations(tenantId, 500, { withAggs }) .then(() => { current-- log.info(`Processed tenant ${i + 1}/${tenantIds.length}`) diff --git a/services/libs/data-access-layer/src/member_merge/index.ts b/services/libs/data-access-layer/src/member_merge/index.ts index 20a8cf79d4..6033e37427 100644 --- a/services/libs/data-access-layer/src/member_merge/index.ts +++ b/services/libs/data-access-layer/src/member_merge/index.ts @@ -28,6 +28,7 @@ export async function addMemberNoMerge( ` INSERT INTO "memberNoMerge" ("memberId", "noMergeId", "createdAt", "updatedAt") VALUES ($(memberId), $(noMergeId), $(createdAt), $(updatedAt)) + on conflict ("memberId", "noMergeId") do nothing `, { memberId, diff --git a/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/memberMergeSuggestions.repo.ts b/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/memberMergeSuggestions.repo.ts index eb9b4c9a79..ca828a014d 100644 --- a/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/memberMergeSuggestions.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/memberMergeSuggestions.repo.ts @@ -284,9 +284,12 @@ class MemberMergeSuggestionsRepository { return results.map((r) => [r.memberId, r.toMergeId]) } - async removeRawMemberSuggestions(suggestion: string[]): Promise { + async removeMemberMergeSuggestion( + suggestion: string[], + table: MemberMergeSuggestionTable, + ): Promise { const query = ` - delete from "memberToMergeRaw" + delete from "${table}" where ("memberId" = $(memberId) and "toMergeId" = $(toMergeId)) or @@ -301,7 +304,7 @@ class MemberMergeSuggestionsRepository { try { await this.connection.none(query, replacements) } catch (error) { - this.log.error('Error removing raw member suggestions', error) + this.log.error(`Error removing member suggestions from ${table}`, error) throw error } } diff --git a/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo.ts b/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo.ts index c25b858486..b97891a811 100644 --- a/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo.ts @@ -301,9 +301,12 @@ class OrganizationMergeSuggestionsRepository { return results.map((r) => [r.organizationId, r.toMergeId]) } - async removeRawOrganizationMergeSuggestions(suggestion: string[]): Promise { + async removeOrganizationMergeSuggestions( + suggestion: string[], + table: OrganizationMergeSuggestionTable, + ): Promise { const query = ` - delete from "organizationToMergeRaw" + delete from "${table}" where ("organizationId" = $(organizationId) and "toMergeId" = $(toMergeId)) or @@ -318,7 +321,7 @@ class OrganizationMergeSuggestionsRepository { try { await this.connection.none(query, replacements) } catch (error) { - this.log.error('Error removing raw organization suggestions', error) + this.log.error(`Error removing organization suggestions rom ${table}!`, error) throw error } } diff --git a/services/libs/data-access-layer/src/org_merge/index.ts b/services/libs/data-access-layer/src/org_merge/index.ts index ad0070055c..f93ade85a0 100644 --- a/services/libs/data-access-layer/src/org_merge/index.ts +++ b/services/libs/data-access-layer/src/org_merge/index.ts @@ -18,3 +18,24 @@ export async function findOrgNoMergeIds( return rows.map((row: { noMergeId: string }) => row.noMergeId) } + +export async function addOrgNoMerge( + qx: QueryExecutor, + organizationId: string, + noMergeId: string, +): Promise { + const currentTime = new Date() + await qx.result( + ` + INSERT INTO "organizationNoMerge" ("organizationId", "noMergeId", "createdAt", "updatedAt") + VALUES ($(organizationId), $(noMergeId), $(createdAt), $(updatedAt)) + on conflict ("organizationId", "noMergeId") do nothing + `, + { + organizationId, + noMergeId, + createdAt: currentTime, + updatedAt: currentTime, + }, + ) +} diff --git a/services/libs/opensearch/src/service/member.sync.service.ts b/services/libs/opensearch/src/service/member.sync.service.ts index abae81d299..1111374d8d 100644 --- a/services/libs/opensearch/src/service/member.sync.service.ts +++ b/services/libs/opensearch/src/service/member.sync.service.ts @@ -266,7 +266,11 @@ export class MemberSyncService { } } - public async syncTenantMembers(tenantId: string, batchSize = 200): Promise { + public async syncTenantMembers( + tenantId: string, + batchSize = 200, + opts: { withAggs?: boolean } = { withAggs: true }, + ): Promise { this.log.debug({ tenantId }, 'Syncing all tenant members!') let docCount = 0 let memberCount = 0 @@ -277,7 +281,7 @@ export class MemberSyncService { while (memberIds.length > 0) { for (const memberId of memberIds) { - const { membersSynced, documentsIndexed } = await this.syncMembers(memberId) + const { membersSynced, documentsIndexed } = await this.syncMembers(memberId, opts) docCount += documentsIndexed memberCount += membersSynced diff --git a/services/libs/opensearch/src/service/organization.sync.service.ts b/services/libs/opensearch/src/service/organization.sync.service.ts index 91bc88db27..a611b0cabe 100644 --- a/services/libs/opensearch/src/service/organization.sync.service.ts +++ b/services/libs/opensearch/src/service/organization.sync.service.ts @@ -245,7 +245,11 @@ export class OrganizationSyncService { } } - public async syncTenantOrganizations(tenantId: string, batchSize = 200): Promise { + public async syncTenantOrganizations( + tenantId: string, + batchSize = 200, + opts: { withAggs?: boolean } = { withAggs: true }, + ): Promise { this.log.warn({ tenantId }, 'Syncing all tenant organizations!') let docCount = 0 let organizationCount = 0 @@ -263,7 +267,7 @@ export class OrganizationSyncService { while (organizationIds.length > 0) { const { organizationsSynced, documentsIndexed } = await this.syncOrganizations( organizationIds, - { withAggs: true }, + opts, ) organizationCount += organizationsSynced @@ -471,9 +475,7 @@ export class OrganizationSyncService { } } - public static async prefixData( - data: IOrganizationFullAggregatesOpensearch, - ): Promise { + public static prefixData(data: IOrganizationFullAggregatesOpensearch): IOrganizationOpensearch { return { uuid_organizationId: data.id, uuid_tenantId: data.tenantId,