diff --git a/services/apps/members_enrichment_worker/src/activities.ts b/services/apps/members_enrichment_worker/src/activities.ts index 3853e32133..1686b5c1d5 100644 --- a/services/apps/members_enrichment_worker/src/activities.ts +++ b/services/apps/members_enrichment_worker/src/activities.ts @@ -8,6 +8,7 @@ import { getEnrichmentInput, getObsoleteSourcesOfMember, getTenantPriorityArray, + hasRemainingCredits, insertMemberEnrichmentCache, isCacheObsolete, isEnrichableBySource, @@ -75,4 +76,5 @@ export { updateMemberUsingSquashedPayload, getTenantPriorityArray, cleanAttributeValue, + hasRemainingCredits, } diff --git a/services/apps/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/members_enrichment_worker/src/activities/enrichment.ts index 0492ef2c03..aca34c5622 100644 --- a/services/apps/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/members_enrichment_worker/src/activities/enrichment.ts @@ -1,6 +1,11 @@ import _ from 'lodash' -import { generateUUIDv1, replaceDoubleQuotes, setAttributesDefaultValues } from '@crowd/common' +import { + generateUUIDv1, + hasIntersection, + replaceDoubleQuotes, + setAttributesDefaultValues, +} from '@crowd/common' import { LlmService } from '@crowd/common_services' import { updateMemberAttributes, @@ -26,6 +31,7 @@ import { import { findOrCreateOrganization } from '@crowd/data-access-layer/src/organizations' import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils' +import { SearchSyncApiClient } from '@crowd/opensearch' import { RedisCache } from '@crowd/redis' import { IEnrichableMember, @@ -37,6 +43,7 @@ import { MemberEnrichmentSource, MemberIdentityType, OrganizationAttributeSource, + OrganizationIdentityType, OrganizationSource, PlatformType, } from '@crowd/types' @@ -65,7 +72,7 @@ export async function getEnrichmentData( input: IEnrichmentSourceInput, ): Promise { const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log) - if ((await service.isEnrichableBySource(input)) && (await hasRemainingCredits(source))) { + if (await service.isEnrichableBySource(input)) { return service.getData(input) } return null @@ -220,6 +227,7 @@ export async function updateMemberUsingSquashedPayload( existingMemberData: IMemberOriginalData, squashedPayload: IMemberEnrichmentDataNormalized, hasContributions: boolean, + isHighConfidenceSourceSelectedForWorkExperiences: boolean, ): Promise { return await svc.postgres.writer.transactionally(async (tx) => { let updated = false @@ -228,7 +236,7 @@ export async function updateMemberUsingSquashedPayload( // process identities if (squashedPayload.identities.length > 0) { - svc.log.info({ memberId }, 'Adding to member identities!') + svc.log.debug({ memberId }, 'Adding to member identities!') for (const i of squashedPayload.identities) { updated = true promises.push( @@ -247,7 +255,7 @@ export async function updateMemberUsingSquashedPayload( // process contributions // if squashed payload has data from progai, we should fetch contributions here // it's ommited from the payload because it takes a lot of space - svc.log.info('Processing contributions! ', { memberId, hasContributions }) + svc.log.debug('Processing contributions! ', { memberId, hasContributions }) if (hasContributions) { promises.push( findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId) @@ -265,7 +273,6 @@ export async function updateMemberUsingSquashedPayload( .then((normalized) => { if (normalized) { const typed = normalized as IMemberEnrichmentDataNormalized - svc.log.info('Normalized contributions: ', { contributions: typed.contributions }) if (typed.contributions) { updated = true @@ -280,7 +287,7 @@ export async function updateMemberUsingSquashedPayload( let attributes = existingMemberData.attributes as Record if (squashedPayload.attributes) { - svc.log.info({ memberId }, 'Updating member attributes!') + svc.log.debug({ memberId }, 'Updating member attributes!') attributes = _.merge({}, attributes, squashedPayload.attributes) @@ -298,7 +305,7 @@ export async function updateMemberUsingSquashedPayload( // process reach if (squashedPayload.reach && Object.keys(squashedPayload.reach).length > 0) { - svc.log.info({ memberId }, 'Updating member reach!') + svc.log.debug({ memberId }, 'Updating member reach!') let reach: IMemberReach if (existingMemberData.reach && existingMemberData.reach.total) { @@ -317,9 +324,35 @@ export async function updateMemberUsingSquashedPayload( } } + const orgIdsToSync: string[] = [] + if (squashedPayload.memberOrganizations.length > 0) { const orgPromises = [] + + // try matching member's existing organizations with the new ones + // we'll be using displayName, title, dates for (const org of squashedPayload.memberOrganizations) { + if (!org.organizationId) { + // Check if any similar in existing work experiences + const existingOrg = existingMemberData.organizations.find((o) => + doesIncomingOrgExistInExistingOrgs(o, org), + ) + + if (existingOrg) { + // Get all orgs with the same name as the current one + const matchingOrgs = squashedPayload.memberOrganizations.filter( + (otherOrg) => otherOrg.name === org.name, + ) + + // Set organizationId for all matching orgs + for (const matchingOrg of matchingOrgs) { + matchingOrg.organizationId = existingOrg.orgId + } + } + } + } + + for (const org of squashedPayload.memberOrganizations.filter((o) => !o.organizationId)) { orgPromises.push( findOrCreateOrganization( qx, @@ -330,15 +363,28 @@ export async function updateMemberUsingSquashedPayload( description: org.organizationDescription, identities: org.identities ? org.identities : [], }, - ).then((orgId) => { - // set the organization id for later use - org.organizationId = orgId - if (org.identities) { - for (const i of org.identities) { - i.organizationId = orgId + ) + .then((orgId) => { + // set the organization id for later use + org.organizationId = orgId + if (org.identities) { + for (const i of org.identities) { + i.organizationId = orgId + } } - } - }), + if (orgId) { + orgIdsToSync.push(orgId) + } + }) + .then(() => + Promise.all( + orgIdsToSync.map((orgId) => + syncOrganization(orgId).catch((error) => { + console.error(`Failed to sync organization with ID ${orgId}:`, error) + }), + ), + ), + ), ) } @@ -351,6 +397,7 @@ export async function updateMemberUsingSquashedPayload( const results = prepareWorkExperiences( existingMemberData.organizations, squashedPayload.memberOrganizations, + isHighConfidenceSourceSelectedForWorkExperiences, ) if (results.toDelete.length > 0) { @@ -388,19 +435,64 @@ export async function updateMemberUsingSquashedPayload( } } + await Promise.all(promises) + if (updated) { await setMemberEnrichmentUpdateDateDb(tx.transaction(), memberId) + await syncMember(memberId) } else { await setMemberEnrichmentTryDateDb(tx.transaction(), memberId) } - await Promise.all(promises) svc.log.debug({ memberId }, 'Member sources processed successfully!') return updated }) } +export function doesIncomingOrgExistInExistingOrgs( + existingOrg: IMemberOrganizationData, + incomingOrg: IMemberEnrichmentDataNormalizedOrganization, +): boolean { + // Check if any similar in existing work experiences + const incomingVerifiedPrimaryDomainIdentityValues = incomingOrg.identities + .filter((i) => i.type === OrganizationIdentityType.PRIMARY_DOMAIN && i.verified) + .map((i) => i.value) + + const existingVerifiedPrimaryDomainIdentityValues = existingOrg.identities + .filter((i) => i.type === OrganizationIdentityType.PRIMARY_DOMAIN && i.verified) + .map((i) => i.value) + + const incomingOrgStartDate = incomingOrg.startDate ? new Date(incomingOrg.startDate) : null + const incomingOrgEndDate = incomingOrg.endDate ? new Date(incomingOrg.endDate) : null + const existingOrgStartDate = existingOrg.dateStart ? new Date(existingOrg.dateStart) : null + const existingOrgEndEndDate = existingOrg.dateEnd ? new Date(existingOrg.dateEnd) : null + + const isSameStartMonthYear = + (!incomingOrgStartDate && !existingOrgStartDate) || // Both start dates are null + (incomingOrgStartDate && + existingOrgStartDate && + incomingOrgStartDate.getMonth() === existingOrgStartDate.getMonth() && + incomingOrgStartDate.getFullYear() === existingOrgStartDate.getFullYear()) + + const isSameEndMonthYear = + (!incomingOrgEndDate && !existingOrgEndEndDate) || // Both end dates are null + (incomingOrgEndDate && + existingOrgEndEndDate && + incomingOrgEndDate.getMonth() === existingOrgEndEndDate.getMonth() && + incomingOrgEndDate.getFullYear() === existingOrgEndEndDate.getFullYear()) + + return ( + hasIntersection( + incomingVerifiedPrimaryDomainIdentityValues, + existingVerifiedPrimaryDomainIdentityValues, + ) || + ((existingOrg.orgName.toLowerCase().includes(incomingOrg.name.toLowerCase()) || + incomingOrg.name.toLowerCase().includes(existingOrg.orgName.toLowerCase())) && + ((isSameStartMonthYear && isSameEndMonthYear) || incomingOrg.title === existingOrg.jobTitle)) + ) +} + export async function setMemberEnrichmentTryDate(memberId: string): Promise { await setMemberEnrichmentTryDateDb(svc.postgres.writer.connection(), memberId) } @@ -608,25 +700,32 @@ export async function findWhichLinkedinProfileToUseAmongScraperResult( } } - if (!categorized.selected && profilesFromUnverfiedIdentities.length > 0) { - const result = await findRelatedLinkedinProfilesWithLLM( - memberId, - memberData, - profilesFromUnverfiedIdentities, - ) + if (profilesFromUnverfiedIdentities.length > 0) { + if (categorized.selected) { + // we already found a match from verified identities, discard all profiles from unverified identities + categorized.discarded = profilesFromUnverfiedIdentities + } else { + const result = await findRelatedLinkedinProfilesWithLLM( + memberId, + memberData, + profilesFromUnverfiedIdentities, + ) - // check if empty object - if (result.profileIndex !== null) { - categorized.selected = profilesFromUnverfiedIdentities[result.profileIndex] - // add profiles not selected to discarded - for (let i = 0; i < profilesFromUnverfiedIdentities.length; i++) { - if (i !== result.profileIndex) { - categorized.discarded.push(profilesFromUnverfiedIdentities[i]) + // check if empty object + if (result.profileIndex !== null) { + if (!categorized.selected) { + categorized.selected = profilesFromUnverfiedIdentities[result.profileIndex] + } + // add profiles not selected to discarded + for (let i = 0; i < profilesFromUnverfiedIdentities.length; i++) { + if (i !== result.profileIndex) { + categorized.discarded.push(profilesFromUnverfiedIdentities[i]) + } } + } else { + // if no match found, we should discard all profiles from verified identities + categorized.discarded = profilesFromUnverfiedIdentities } - } else { - // if no match found, we should discard all profiles from verified identities - categorized.discarded = profilesFromUnverfiedIdentities } } @@ -758,14 +857,25 @@ interface IWorkExperienceChanges { function prepareWorkExperiences( oldVersion: IMemberOrganizationData[], newVersion: IMemberEnrichmentDataNormalizedOrganization[], + isHighConfidenceSourceSelectedForWorkExperiences: boolean, ): IWorkExperienceChanges { // we delete all the work experiences that were not manually created - const toDelete = oldVersion.filter((c) => c.source !== OrganizationSource.UI) + let toDelete = oldVersion.filter((c) => c.source !== OrganizationSource.UI) const toCreate: IMemberEnrichmentDataNormalizedOrganization[] = [] // eslint-disable-next-line @typescript-eslint/no-explicit-any const toUpdate: Map> = new Map() + if (isHighConfidenceSourceSelectedForWorkExperiences) { + toDelete = oldVersion + toCreate.push(...newVersion) + return { + toDelete, + toCreate, + toUpdate, + } + } + // sort both versions by start date and only use manual changes from the current version const orderedCurrentVersion = oldVersion .filter((c) => c.source === OrganizationSource.UI) @@ -778,6 +888,7 @@ function prepareWorkExperiences( // Compare dates if both values exist return new Date(a.dateStart as string).getTime() - new Date(b.dateStart as string).getTime() }) + let orderedNewVersion = newVersion.sort((a, b) => { // If either value is null/undefined, move it to the beginning if (!a.startDate && !b.startDate) return 0 @@ -796,46 +907,35 @@ function prepareWorkExperiences( // we iterate through the existing version experiences to see if update is needed for (const current of orderedCurrentVersion) { // try and find a matching experience in the new versions by title - let match = orderedNewVersion.find( + const match = orderedNewVersion.find( (e) => e.title === current.jobTitle && e.identities && e.identities.some((e) => e.organizationId === current.orgId), ) - if (!match) { - // if we didn't find a match by title we should check dates - match = orderedNewVersion.find( - (e) => - dateIntersects(current.dateStart, current.dateEnd, e.startDate, e.endDate) && - e.identities && - e.identities.some((e) => e.organizationId === current.orgId), - ) - } // if we found a match we can check if we need something to update - if (match) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any + if ( + match && + current.dateStart === match.startDate && + current.dateEnd === null && + match.endDate !== null + ) { const toUpdateInner: Record = {} - // lets check if the dates and title are the same otherwise we need to update them - if (current.dateStart !== match.startDate) { - toUpdateInner.dateStart = match.startDate - } - - if (current.dateEnd !== match.endDate) { - toUpdateInner.dateEnd = match.endDate - } - - if (current.jobTitle !== match.title) { - toUpdateInner.title = match.title - } - - if (Object.keys(toUpdateInner).length > 0) { - toUpdate.set(current, toUpdateInner) - } + toUpdateInner.dateEnd = match.endDate + toUpdate.set(current, toUpdateInner) // remove the match from the new version array so we later don't process it again orderedNewVersion = orderedNewVersion.filter((e) => e.id !== match.id) + } else if ( + match && + (current.dateStart !== match.startDate || current.dateEnd !== null || match.endDate === null) + ) { + // there's an incoming work experiences, but it's conflicting with the existing manually updated data + // we shouldn't add or update anything when this happens + // we can only update dateEnd of existing manually changed data, when it has a null dateEnd + orderedNewVersion = orderedNewVersion.filter((e) => e.id !== match.id) } // if we didn't find a match we should just leave it as it is in the database since it was manual input } @@ -850,26 +950,20 @@ function prepareWorkExperiences( } } -function dateIntersects( - d1Start?: string | null, - d1End?: string | null, - d2Start?: string | null, - d2End?: string | null, -): boolean { - // If both periods have no dates at all, we can't determine intersection - if ((!d1Start && !d1End) || (!d2Start && !d2End)) { - return false - } +export async function syncMember(memberId: string): Promise { + const syncApi = new SearchSyncApiClient({ + baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'], + }) - // Convert strings to timestamps, using fallbacks for missing dates - const start1 = d1Start ? new Date(d1Start).getTime() : -Infinity - const end1 = d1End ? new Date(d1End).getTime() : Infinity - const start2 = d2Start ? new Date(d2Start).getTime() : -Infinity - const end2 = d2End ? new Date(d2End).getTime() : Infinity + await syncApi.triggerMemberSync(memberId, { withAggs: false }) +} + +export async function syncOrganization(organizationId: string): Promise { + const syncApi = new SearchSyncApiClient({ + baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'], + }) - // Periods intersect if one period's start is before other period's end - // and that same period's end is after the other period's start - return start1 <= end2 && end1 >= start2 + await syncApi.triggerOrganizationSync(organizationId, undefined, { withAggs: false }) } export async function cleanAttributeValue( diff --git a/services/apps/members_enrichment_worker/src/bin/onboarding.ts b/services/apps/members_enrichment_worker/src/bin/onboarding.ts index 31f1e6daa4..64df3cb9a6 100644 --- a/services/apps/members_enrichment_worker/src/bin/onboarding.ts +++ b/services/apps/members_enrichment_worker/src/bin/onboarding.ts @@ -27,7 +27,7 @@ const tenantId = processArguments[0] const minMemberActivities = 100 const maxConcurrentProcessing = 5 -const maxMembersToProcess = 1000 +const maxMembersToProcess = Infinity async function getEnrichableMembers(limit: number): Promise { const query = ` diff --git a/services/apps/members_enrichment_worker/src/sources/clearbit/service.ts b/services/apps/members_enrichment_worker/src/sources/clearbit/service.ts index 0345481114..60efc4c8dc 100644 --- a/services/apps/members_enrichment_worker/src/sources/clearbit/service.ts +++ b/services/apps/members_enrichment_worker/src/sources/clearbit/service.ts @@ -1,5 +1,6 @@ import axios from 'axios' +import { replaceDoubleQuotes } from '@crowd/common' import { Logger, LoggerBase } from '@crowd/logging' import { MemberAttributeName, @@ -216,10 +217,10 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn } normalized.memberOrganizations.push({ - name: data.employment.name, + name: replaceDoubleQuotes(data.employment.name), source: OrganizationSource.ENRICHMENT_CLEARBIT, identities: orgIdentities, - title: data.employment.title, + title: replaceDoubleQuotes(data.employment.title), startDate: null, endDate: null, }) diff --git a/services/apps/members_enrichment_worker/src/sources/crustdata/service.ts b/services/apps/members_enrichment_worker/src/sources/crustdata/service.ts index 195fe8ce82..f0c5bc5f23 100644 --- a/services/apps/members_enrichment_worker/src/sources/crustdata/service.ts +++ b/services/apps/members_enrichment_worker/src/sources/crustdata/service.ts @@ -1,6 +1,6 @@ import axios from 'axios' -import { isEmail } from '@crowd/common' +import { isEmail, replaceDoubleQuotes } from '@crowd/common' import { Logger, LoggerBase } from '@crowd/logging' import { IMemberEnrichmentCache, @@ -352,13 +352,15 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE } normalized.memberOrganizations.push({ - name: workExperience.employer_name, + name: replaceDoubleQuotes(workExperience.employer_name), source: OrganizationSource.ENRICHMENT_CRUSTDATA, identities, - title: workExperience.employee_title, + title: replaceDoubleQuotes(workExperience.employee_title), startDate: workExperience?.start_date ?? null, endDate: workExperience?.end_date ?? null, - organizationDescription: workExperience.employer_linkedin_description, + organizationDescription: replaceDoubleQuotes( + workExperience.employer_linkedin_description, + ), }) } } diff --git a/services/apps/members_enrichment_worker/src/sources/progai/service.ts b/services/apps/members_enrichment_worker/src/sources/progai/service.ts index 65b07427b4..584fa4d5d6 100644 --- a/services/apps/members_enrichment_worker/src/sources/progai/service.ts +++ b/services/apps/members_enrichment_worker/src/sources/progai/service.ts @@ -1,7 +1,7 @@ import axios from 'axios' import lodash from 'lodash' -import { websiteNormalizer } from '@crowd/common' +import { replaceDoubleQuotes, websiteNormalizer } from '@crowd/common' import { Logger, LoggerBase } from '@crowd/logging' import { MemberAttributeName, @@ -261,51 +261,80 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri ): IMemberEnrichmentDataNormalized { if (data.work_experiences) { for (const workExperience of data.work_experiences) { - const identities = [] - - if (workExperience.companyUrl) { - const normalizedDomain = websiteNormalizer(workExperience.companyUrl, false) + if ( + workExperience.company !== null || + workExperience.companyUrl !== null || + workExperience.companyLinkedInUrl !== null + ) { + const identities = [] + let hasPrimaryDomainIdentity = false + + if (workExperience.companyUrl) { + const normalizedDomain = websiteNormalizer(workExperience.companyUrl, false) + + // sometimes companyUrl is a github link, we don't want to add it as a primary domain + if ( + normalizedDomain && + !workExperience.companyUrl.toLowerCase().includes('github') && + !(workExperience.company || '').toLowerCase().includes('github') + ) { + identities.push({ + platform: PlatformType.LINKEDIN, + value: normalizedDomain, + type: OrganizationIdentityType.PRIMARY_DOMAIN, + verified: true, + }) + hasPrimaryDomainIdentity = true + } + } - // sometimes companyUrl is a github link, we don't want to add it as a primary domain if ( - normalizedDomain && - !workExperience.companyUrl.toLowerCase().includes('github') && - !workExperience.company.toLowerCase().includes('github') + workExperience.companyLinkedInUrl && + this.getLinkedInProfileHandle(workExperience.companyLinkedInUrl) ) { identities.push({ platform: PlatformType.LINKEDIN, - value: normalizedDomain, - type: OrganizationIdentityType.PRIMARY_DOMAIN, - verified: true, + value: this.getLinkedInProfileHandle(workExperience.companyLinkedInUrl), + type: OrganizationIdentityType.USERNAME, + verified: !hasPrimaryDomainIdentity, }) } - } - if (workExperience.companyLinkedInUrl) { - identities.push({ - platform: PlatformType.LINKEDIN, - value: `company:${workExperience.companyLinkedInUrl.split('/').pop()}`, - type: OrganizationIdentityType.USERNAME, - verified: true, + normalized.memberOrganizations.push({ + name: replaceDoubleQuotes(workExperience.company), + source: OrganizationSource.ENRICHMENT_PROGAI, + identities, + title: replaceDoubleQuotes(workExperience.title), + startDate: workExperience.startDate + ? workExperience.startDate.replace('Z', '+00:00') + : null, + endDate: workExperience.endDate ? workExperience.endDate.replace('Z', '+00:00') : null, }) } - - normalized.memberOrganizations.push({ - name: workExperience.company, - source: OrganizationSource.ENRICHMENT_PROGAI, - identities, - title: workExperience.title, - startDate: workExperience.startDate - ? workExperience.startDate.replace('Z', '+00:00') - : null, - endDate: workExperience.endDate ? workExperience.endDate.replace('Z', '+00:00') : null, - }) } } return normalized } + private getLinkedInProfileHandle(url: string): string | null { + let regex = /company\/([^/]+)/ + let match = url.match(regex) + + if (match) { + return `company:${match[1]}` + } + + regex = /school\/([^/]+)/ + match = url.match(regex) + + if (match) { + return `school:${match[1]}` + } + + return null + } + async getDataUsingGitHubHandle(githubUsername: string): Promise { const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile` const config = { diff --git a/services/apps/members_enrichment_worker/src/sources/serp/service.ts b/services/apps/members_enrichment_worker/src/sources/serp/service.ts index 6a93a08042..5cca50b3dd 100644 --- a/services/apps/members_enrichment_worker/src/sources/serp/service.ts +++ b/services/apps/members_enrichment_worker/src/sources/serp/service.ts @@ -128,13 +128,19 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr platform: PlatformType.LINKEDIN, type: MemberIdentityType.USERNAME, verified: false, - value: this.normalizeLinkedUrl(data.linkedinUrl).split('/').pop(), + value: this.getLinkedInProfileHandle(this.normalizeLinkedUrl(data.linkedinUrl)), }, ], } return normalized } + private getLinkedInProfileHandle(url: string): string | null { + const regex = /in\/([^/]+)/ + const match = url.match(regex) + return match ? match[1] : null + } + private normalizeLinkedUrl(url: string): string { try { const parsedUrl = new URL(url) diff --git a/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts b/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts index 16088ac8f5..88cbae8787 100644 --- a/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts +++ b/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts @@ -21,6 +21,7 @@ const { updateMemberEnrichmentCache, isCacheObsolete, getEnrichmentInput, + hasRemainingCredits, } = proxyActivities({ startToCloseTimeout: '1 minute', retry: { @@ -46,6 +47,12 @@ export async function enrichMember( if (await isCacheObsolete(source, cache)) { const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input) + if (!(await hasRemainingCredits(source))) { + // no credits remaining, only update cache.updatedAt and keep the old data + await touchMemberEnrichmentCacheUpdatedAt(source, input.id) + continue + } + const data = await getEnrichmentData(source, enrichmentInput) if (!cache) { diff --git a/services/apps/members_enrichment_worker/src/workflows/processMemberSources.ts b/services/apps/members_enrichment_worker/src/workflows/processMemberSources.ts index d573c1ac14..0e0dfb2e15 100644 --- a/services/apps/members_enrichment_worker/src/workflows/processMemberSources.ts +++ b/services/apps/members_enrichment_worker/src/workflows/processMemberSources.ts @@ -233,7 +233,7 @@ export async function processMemberSources(args: IProcessMemberSourcesArgs): Pro for (const attribute of Object.keys(multipleValueAttributesSquashed)) { if (multipleValueAttributesSquashed[attribute]) { attributesSquashed[attribute] = { - enrichment: multipleValueAttributesSquashed[attribute], + enrichment: await cleanAttributeValue(multipleValueAttributesSquashed[attribute]), } } } @@ -265,6 +265,21 @@ export async function processMemberSources(args: IProcessMemberSourcesArgs): Pro args.memberId, workExperienceDataInDifferentSources, ) + // if there are multiple verified identities in work experiences, we reduce it + // to one because in our db they might exist in different organizations and + // might need a merge. To avoid this, we'll only send the org with one verified identity + workExperiencesSquashedByLLM.forEach((we) => { + let found = false + we.identities = (we.identities || []).map((i) => { + if (i.verified && !found) { + found = true + return i + } else if (i.verified) { + return { ...i, verified: false } + } + return i + }) + }) squashedPayload.memberOrganizations = workExperiencesSquashedByLLM } } @@ -279,6 +294,7 @@ export async function processMemberSources(args: IProcessMemberSourcesArgs): Pro existingMemberData, squashedPayload, progaiLinkedinScraperProfileSelected && hasContributions, + !!crustDataProfileSelected, ) return memberUpdated diff --git a/services/libs/common/src/array.ts b/services/libs/common/src/array.ts index 7bb53f773a..5f84d630ee 100644 --- a/services/libs/common/src/array.ts +++ b/services/libs/common/src/array.ts @@ -129,3 +129,8 @@ export const areArraysEqual = (a: T[], b: T[]): boolean => { export const firstArrayContainsSecondArray = (array1: T[], array2: T[]): boolean => { return array2.every((val) => array1.includes(val)) } + +export const hasIntersection = (arr1: string[], arr2: string[]): boolean => { + const set1 = new Set(arr1) + return arr2.some((item) => set1.has(item)) +} diff --git a/services/libs/common/src/utils.ts b/services/libs/common/src/utils.ts index 7bc376b7b4..0234a7fc6c 100644 --- a/services/libs/common/src/utils.ts +++ b/services/libs/common/src/utils.ts @@ -73,7 +73,7 @@ export const redactNullByte = (str: string | null | undefined): string => str ? str.replace(/\\u0000|\0/g, '[NULL]') : '' export const replaceDoubleQuotes = (str: string | null | undefined): string => - str ? str.replace(/"/g, "'") : '' + str ? str.replace(/[\u201C\u201D\u0022\u201E\u201F\u2033\u2036"]/g, "'") : '' export const dateEqualityChecker = (a, b) => { if (a instanceof Date) { diff --git a/services/libs/common_services/src/services/llm.service.ts b/services/libs/common_services/src/services/llm.service.ts index 5b4ba5a26f..ca60a1690a 100644 --- a/services/libs/common_services/src/services/llm.service.ts +++ b/services/libs/common_services/src/services/llm.service.ts @@ -123,7 +123,7 @@ export class LlmService extends LoggerBase { const outputCost = (outputTokenCount / 1000) * pricing.costPer1000OutputTokens const totalCost = inputCost + outputCost - this.log.info({ type, entityId, inputCost, outputCost, totalCost }, 'Estimated LLM cost!') + this.log.debug({ type, entityId, inputCost, outputCost, totalCost }, 'Estimated LLM cost!') const result = { prompt, diff --git a/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts index cc211bbd03..5b663051f5 100644 --- a/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts @@ -20,19 +20,23 @@ export async function fetchMemberDataForLLMSquashing( ): Promise { const result = await db.oneOrNone( ` - with member_orgs as (select distinct mo."memberId", - mo."organizationId" as "orgId", - o."displayName" as "orgName", - mo.title as "jobTitle", - mo.id, - mo."dateStart", - mo."dateEnd", - mo.source + with member_orgs as (select + distinct mo."memberId", + mo."organizationId" as "orgId", + o."displayName" as "orgName", + mo.title as "jobTitle", + mo.id, + mo."dateStart", + mo."dateEnd", + mo.source, + jsonb_agg(oi) as identities from "memberOrganizations" mo - inner join organizations o on mo."organizationId" = o.id + inner join organizations o on mo."organizationId" = o.id + inner join "organizationIdentities" oi on oi."organizationId" = o.id where mo."memberId" = $(memberId) and mo."deletedAt" is null - and o."deletedAt" is null) + and o."deletedAt" is null + group by mo."memberId", mo."organizationId", o."displayName", mo.id) select m."displayName", m.attributes, m."manuallyChangedFields", @@ -46,8 +50,7 @@ export async function fetchMemberDataForLLMSquashing( mi.value) r) ) from "memberIdentities" mi - where mi."memberId" = m.id - and verified = true), '[]'::json) as identities, + where mi."memberId" = m.id), '[]'::json) as identities, case when exists (select 1 from member_orgs where "memberId" = m.id) then ( @@ -59,7 +62,8 @@ export async function fetchMemberDataForLLMSquashing( mo."jobTitle", mo."dateStart", mo."dateEnd", - mo.source) r) + mo.source, + mo.identities) r) ) from member_orgs mo where mo."memberId" = m.id @@ -136,8 +140,9 @@ export async function fetchMembersForEnrichment( INNER JOIN tenants ON tenants.id = members."tenantId" INNER JOIN "memberIdentities" mi ON mi."memberId" = members.id LEFT JOIN "membersGlobalActivityCount" ON "membersGlobalActivityCount"."memberId" = members.id - WHERE + WHERE ${enrichableBySqlJoined} + AND coalesce((m.attributes ->'isBot'->>'default')::boolean, false) = false AND tenants."deletedAt" IS NULL AND members."deletedAt" IS NULL AND (${cacheAgeInnerQueryItems.join(' OR ')}) diff --git a/services/libs/data-access-layer/src/organizations/organizations.ts b/services/libs/data-access-layer/src/organizations/organizations.ts index 2b2bb5f3da..0f64dc8c39 100644 --- a/services/libs/data-access-layer/src/organizations/organizations.ts +++ b/services/libs/data-access-layer/src/organizations/organizations.ts @@ -156,6 +156,42 @@ export async function findOrgByName( return result } +export async function findOrgByVerifiedDomain( + qx: QueryExecutor, + tenantId: string, + identity: IDbOrgIdentity, +): Promise { + if (identity.type !== OrganizationIdentityType.PRIMARY_DOMAIN) { + throw new Error('Invalid identity type') + } + const result = await qx.selectOneOrNone( + ` + with "organizationsWithIdentity" as ( + select oi."organizationId" + from "organizationIdentities" oi + where + oi."tenantId" = $(tenantId) + and lower(oi.value) = lower($(value)) + and oi.type = $(type) + and oi.verified = true + ) + select ${prepareSelectColumns(ORG_SELECT_COLUMNS, 'o')} + from organizations o + where o."tenantId" = $(tenantId) + and o.id in (select distinct "organizationId" from "organizationsWithIdentity") + limit 1; + `, + { + tenantId, + value: identity.value, + platform: identity.platform, + type: identity.type, + }, + ) + + return result +} + export async function findOrgByVerifiedIdentity( qx: QueryExecutor, tenantId: string, @@ -501,6 +537,11 @@ export async function findOrCreateOrganization( // find existing org by sent verified identities for (const identity of verifiedIdentities) { existing = await findOrgByVerifiedIdentity(qe, tenantId, identity) + + if (!existing && identity.type === OrganizationIdentityType.PRIMARY_DOMAIN) { + // if primary domain isn't found in the incoming platform, check if the domain exists in any platform + existing = await findOrgByVerifiedDomain(qe, tenantId, identity) + } if (existing) { break } @@ -513,7 +554,7 @@ export async function findOrCreateOrganization( let id if (!existing && verifiedIdentities.length === 0) { - log.warn( + log.debug( { tenantId }, 'Organization does not have any verified identities and was not found by name so we will not create it.', ) diff --git a/services/libs/opensearch/src/service/organization.sync.service.ts b/services/libs/opensearch/src/service/organization.sync.service.ts index 402621db13..91bc88db27 100644 --- a/services/libs/opensearch/src/service/organization.sync.service.ts +++ b/services/libs/opensearch/src/service/organization.sync.service.ts @@ -414,9 +414,12 @@ export class OrganizationSyncService { OrganizationField.LOCATION, OrganizationField.INDUSTRY, ]) - const data = await buildFullOrgForMergeSuggestions(qx, base) - const prefixed = OrganizationSyncService.prefixData(data) - await this.openSearchService.index(orgId, OpenSearchIndex.ORGANIZATIONS, prefixed) + + if (base) { + const data = await buildFullOrgForMergeSuggestions(qx, base) + const prefixed = OrganizationSyncService.prefixData(data) + await this.openSearchService.index(orgId, OpenSearchIndex.ORGANIZATIONS, prefixed) + } } return { diff --git a/services/libs/types/src/enrichment.ts b/services/libs/types/src/enrichment.ts index 07005a8767..a5e67668f0 100644 --- a/services/libs/types/src/enrichment.ts +++ b/services/libs/types/src/enrichment.ts @@ -1,5 +1,6 @@ import { MemberEnrichmentSource } from './enums' import { IMemberIdentity, IMemberReach } from './members' +import { IOrganizationIdentity } from './organizations' export interface IMemberEnrichmentCache { createdAt: string @@ -39,6 +40,7 @@ export interface IMemberOrganizationData { dateStart: string dateEnd: string source: string + identities?: IOrganizationIdentity[] } export interface IMemberOriginalData { diff --git a/services/libs/types/src/enums/llm.ts b/services/libs/types/src/enums/llm.ts index 28f707c36a..7d28970db5 100644 --- a/services/libs/types/src/enums/llm.ts +++ b/services/libs/types/src/enums/llm.ts @@ -1,5 +1,6 @@ export enum LlmModelType { CLAUDE_3_5_SONNET = 'anthropic.claude-3-5-sonnet-20240620-v1:0', + CLAUDE_3_5_SONNET_V2 = 'anthropic.claude-3-5-sonnet-20241022-v2:0', CLAUDE_3_OPUS = 'anthropic.claude-3-opus-20240229-v1:0', } diff --git a/services/libs/types/src/llm.ts b/services/libs/types/src/llm.ts index f2bf310839..677a0fe952 100644 --- a/services/libs/types/src/llm.ts +++ b/services/libs/types/src/llm.ts @@ -30,6 +30,7 @@ export interface ILlmPricing { export const LLM_MODEL_REGION_MAP: Record = { [LlmModelType.CLAUDE_3_OPUS]: 'us-west-2', [LlmModelType.CLAUDE_3_5_SONNET]: 'us-east-1', + [LlmModelType.CLAUDE_3_5_SONNET_V2]: 'us-west-2', } // to estimate costs - these numbers can change @@ -42,6 +43,10 @@ export const LLM_MODEL_PRICING_MAP: Record = { costPer1000InputTokens: 0.003, costPer1000OutputTokens: 0.015, }, + [LlmModelType.CLAUDE_3_5_SONNET_V2]: { + costPer1000InputTokens: 0.003, + costPer1000OutputTokens: 0.015, + }, } export const LLM_SETTINGS: Record = { @@ -54,7 +59,7 @@ export const LLM_SETTINGS: Record = { }, }, [LlmQueryType.MEMBER_ENRICHMENT_FIND_RELATED_LINKEDIN_PROFILES]: { - modelId: LlmModelType.CLAUDE_3_5_SONNET, + modelId: LlmModelType.CLAUDE_3_5_SONNET_V2, arguments: { max_tokens: 200000, anthropic_version: 'bedrock-2023-05-31', @@ -62,7 +67,7 @@ export const LLM_SETTINGS: Record = { }, }, [LlmQueryType.MEMBER_ENRICHMENT_SQUASH_MULTIPLE_VALUE_ATTRIBUTES]: { - modelId: LlmModelType.CLAUDE_3_5_SONNET, + modelId: LlmModelType.CLAUDE_3_5_SONNET_V2, arguments: { max_tokens: 200000, anthropic_version: 'bedrock-2023-05-31', @@ -70,7 +75,7 @@ export const LLM_SETTINGS: Record = { }, }, [LlmQueryType.MEMBER_ENRICHMENT_SQUASH_WORK_EXPERIENCES_FROM_MULTIPLE_SOURCES]: { - modelId: LlmModelType.CLAUDE_3_5_SONNET, + modelId: LlmModelType.CLAUDE_3_5_SONNET_V2, arguments: { max_tokens: 200000, anthropic_version: 'bedrock-2023-05-31',