diff --git a/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts index 845e963628..2e11ee92cb 100644 --- a/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts @@ -1,6 +1,7 @@ import { findMemberIdentityWithTheMostActivityInPlatform as findMemberIdentityWithTheMostActivityInPlatformQuestDb } from '@crowd/data-access-layer/src/activities' import { findMemberEnrichmentCacheDb, + findMemberEnrichmentCacheForAllSourcesDb, insertMemberEnrichmentCacheDb, touchMemberEnrichmentCacheUpdatedAtDb, updateMemberEnrichmentCacheDb, @@ -42,7 +43,7 @@ export async function getEnrichmentData( export async function normalizeEnrichmentData( source: MemberEnrichmentSource, data: IMemberEnrichmentData, -): Promise { +): Promise { const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log) return service.normalize(data) } @@ -100,6 +101,12 @@ export async function findMemberEnrichmentCache( return findMemberEnrichmentCacheDb(svc.postgres.reader.connection(), memberId, source) } +export async function findMemberEnrichmentCacheForAllSources( + memberId: string, +): Promise[]> { + return findMemberEnrichmentCacheForAllSourcesDb(svc.postgres.reader.connection(), memberId) +} + export async function insertMemberEnrichmentCache( source: MemberEnrichmentSource, memberId: string, diff --git a/services/apps/premium/members_enrichment_worker/src/factory.ts b/services/apps/premium/members_enrichment_worker/src/factory.ts index fe5054efe8..6ea8143162 100644 --- a/services/apps/premium/members_enrichment_worker/src/factory.ts +++ b/services/apps/premium/members_enrichment_worker/src/factory.ts @@ -3,6 +3,7 @@ import { Logger } from '@crowd/logging' import { MemberEnrichmentSource } from '@crowd/types' import EnrichmentServiceClearbit from './sources/clearbit/service' +import EnrichmentServiceProgAILinkedinScraper from './sources/progai-linkedin-scraper/service' import EnrichmentServiceProgAI from './sources/progai/service' import EnrichmentServiceSerpApi from './sources/serp/service' import { IEnrichmentService } from './types' @@ -24,6 +25,8 @@ export class EnrichmentSourceServiceFactory { return new EnrichmentServiceClearbit(log) case MemberEnrichmentSource.SERP: return new EnrichmentServiceSerpApi(log) + case MemberEnrichmentSource.PROGAI_LINKEDIN_SCRAPER: + return new EnrichmentServiceProgAILinkedinScraper(log) default: throw new Error(`Enrichment service for ${source} is not found!`) } diff --git a/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts index 7af642ac91..6d5884b84b 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts @@ -56,7 +56,7 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn super(log) } - isEnrichableBySource(input: IEnrichmentSourceInput): boolean { + async isEnrichableBySource(input: IEnrichmentSourceInput): Promise { return !!input.email?.value && input.email?.verified } diff --git a/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts new file mode 100644 index 0000000000..29f95d4187 --- /dev/null +++ b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts @@ -0,0 +1,201 @@ +import axios from 'axios' + +import { Logger, LoggerBase } from '@crowd/logging' +import { + IMemberEnrichmentCache, + IMemberIdentity, + MemberEnrichmentSource, + PlatformType, +} from '@crowd/types' + +import { findMemberEnrichmentCacheForAllSources } from '../../activities/enrichment' +import { EnrichmentSourceServiceFactory } from '../../factory' +import { + IEnrichmentService, + IEnrichmentSourceInput, + IMemberEnrichmentData, + IMemberEnrichmentDataNormalized, +} from '../../types' +import { IMemberEnrichmentDataProgAI, IMemberEnrichmentDataProgAIResponse } from '../progai/types' + +import { IMemberEnrichmentDataProgAILinkedinScraper } from './types' + +export default class EnrichmentServiceProgAILinkedinScraper + extends LoggerBase + implements IEnrichmentService +{ + public source: MemberEnrichmentSource = MemberEnrichmentSource.PROGAI_LINKEDIN_SCRAPER + public platform = `enrichment-${this.source}` + + public alsoFindInputsInSourceCaches: MemberEnrichmentSource[] = [ + MemberEnrichmentSource.PROGAI, + MemberEnrichmentSource.CLEARBIT, + MemberEnrichmentSource.SERP, + ] + + public enrichableBySql = `(mi.verified AND mi.type = 'username' and mi.platform = 'linkedin')` + + public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90 + + constructor(public readonly log: Logger) { + super(log) + } + + // in addition to members with linkedin identity we'll also use existing cache rows from other sources (serp and clearbit) + async isEnrichableBySource(input: IEnrichmentSourceInput): Promise { + const caches = await findMemberEnrichmentCacheForAllSources(input.memberId) + + let hasEnrichableLinkedinInCache = false + for (const cache of caches) { + if (this.alsoFindInputsInSourceCaches.includes(cache.source)) { + const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService( + cache.source, + this.log, + ) + const normalized = service.normalize(cache.data) as IMemberEnrichmentDataNormalized + if (normalized.identities.some((i) => i.platform === PlatformType.LINKEDIN)) { + hasEnrichableLinkedinInCache = true + break + } + + break + } + } + + return ( + hasEnrichableLinkedinInCache || + (input.linkedin && input.linkedin.value && input.linkedin.verified) + ) + } + + async hasRemainingCredits(): Promise { + return true + } + + private async findConsumableLinkedinIdentities( + input: IEnrichmentSourceInput, + caches: IMemberEnrichmentCache[], + ): Promise< + (IMemberIdentity & { repeatedTimesInDifferentSources: number; isFromVerifiedSource: boolean })[] + > { + const consumableIdentities: (IMemberIdentity & { + repeatedTimesInDifferentSources: number + isFromVerifiedSource: boolean + })[] = [] + const linkedinUrlHashmap = new Map() + + for (const cache of caches) { + if (this.alsoFindInputsInSourceCaches.includes(cache.source)) { + const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService( + cache.source, + this.log, + ) + const normalized = service.normalize(cache.data) as IMemberEnrichmentDataNormalized + if (normalized.identities.some((i) => i.platform === PlatformType.LINKEDIN)) { + const identity = normalized.identities.find((i) => i.platform === PlatformType.LINKEDIN) + if (!linkedinUrlHashmap.get(identity.value)) { + consumableIdentities.push({ + ...identity, + repeatedTimesInDifferentSources: 1, + isFromVerifiedSource: false, + }) + linkedinUrlHashmap.set(identity.value, 1) + } else { + const repeatedTimesInDifferentSources = linkedinUrlHashmap.get(identity.value) + 1 + linkedinUrlHashmap.set(identity.value, repeatedTimesInDifferentSources) + consumableIdentities.find( + (i) => i.value === identity.value, + ).repeatedTimesInDifferentSources = repeatedTimesInDifferentSources + } + } + } + } + + // also add the linkedin identity from the input + if (input.linkedin && input.linkedin.value && input.linkedin.verified) { + if (!linkedinUrlHashmap.get(input.linkedin.value)) { + consumableIdentities.push({ + ...input.linkedin, + repeatedTimesInDifferentSources: 1, + isFromVerifiedSource: true, + }) + } else { + const repeatedTimesInDifferentSources = linkedinUrlHashmap.get(input.linkedin.value) + 1 + const identityFound = consumableIdentities.find((i) => i.value === input.linkedin.value) + + identityFound.repeatedTimesInDifferentSources = repeatedTimesInDifferentSources + identityFound.isFromVerifiedSource = true + } + } + return consumableIdentities + } + + async getData( + input: IEnrichmentSourceInput, + ): Promise { + const profiles: IMemberEnrichmentDataProgAILinkedinScraper[] = [] + const caches = await findMemberEnrichmentCacheForAllSources(input.memberId) + + const consumableIdentities = await this.findConsumableLinkedinIdentities(input, caches) + + for (const identity of consumableIdentities) { + const data = await this.getDataUsingLinkedinHandle(identity.value) + if (data) { + const existingProgaiCache = caches.find((c) => c.source === MemberEnrichmentSource.PROGAI) + // we don't want to reinforce the cache with the same data, only save to cache + // if a new profile is returned from progai + if ( + existingProgaiCache && + existingProgaiCache.data && + (existingProgaiCache.data as IMemberEnrichmentDataProgAI).id == data.id + ) { + continue + } + profiles.push({ + ...data, + metadata: { + repeatedTimesInDifferentSources: identity.repeatedTimesInDifferentSources, + isFromVerifiedSource: identity.isFromVerifiedSource, + }, + }) + } + } + + return profiles.length > 0 ? profiles : null + } + + private async getDataUsingLinkedinHandle(handle: string): Promise { + const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile` + const config = { + method: 'get', + url, + params: { + linkedin_url: `https://linkedin.com/in/${handle}`, + with_emails: true, + api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'], + }, + headers: {}, + } + + const response: IMemberEnrichmentDataProgAIResponse = (await axios(config)).data + + return response?.profile || null + } + + normalize( + profiles: IMemberEnrichmentDataProgAILinkedinScraper[], + ): IMemberEnrichmentDataNormalized[] { + const normalizedProfiles: IMemberEnrichmentDataNormalized[] = [] + const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService( + MemberEnrichmentSource.PROGAI, + this.log, + ) + + for (const profile of profiles) { + const normalized = progaiService.normalize(profile) as IMemberEnrichmentDataNormalized + normalizedProfiles.push({ ...normalized, metadata: profile.metadata }) + } + + return normalizedProfiles.length > 0 ? normalizedProfiles : null + } +} diff --git a/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/types.ts b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/types.ts new file mode 100644 index 0000000000..af7bda13c7 --- /dev/null +++ b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/types.ts @@ -0,0 +1,8 @@ +import { IMemberEnrichmentDataProgAI } from '../progai/types' + +export interface IMemberEnrichmentDataProgAILinkedinScraper extends IMemberEnrichmentDataProgAI { + metadata: { + repeatedTimesInDifferentSources: number + isFromVerifiedSource: boolean + } +} diff --git a/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts index e630110b2f..5b6dc736f4 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts @@ -118,7 +118,7 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri super(log) } - isEnrichableBySource(input: IEnrichmentSourceInput): boolean { + async isEnrichableBySource(input: IEnrichmentSourceInput): Promise { const enrichableUsingGithubHandle = !!input.github?.value const enrichableUsingEmail = this.alsoUseEmailIdentitiesForEnrichment && !!input.email?.value return enrichableUsingGithubHandle || enrichableUsingEmail @@ -309,47 +309,36 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri } async getDataUsingGitHubHandle(githubUsername: string): Promise { - let response: IMemberEnrichmentDataProgAIResponse - - try { - const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile` - const config = { - method: 'get', - url, - params: { - github_handle: githubUsername, - with_emails: true, - api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'], - }, - headers: {}, - } - - response = (await axios(config)).data - } catch (err) { - throw new Error(err) + const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile` + const config = { + method: 'get', + url, + params: { + github_handle: githubUsername, + with_emails: true, + api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'], + }, + headers: {}, } - return response.profile + const response: IMemberEnrichmentDataProgAIResponse = (await axios(config)).data + return response?.profile || null } async getDataUsingEmailAddress(email: string): Promise { - try { - const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile` - const config = { - method: 'get', - url, - params: { - email, - with_emails: true, - api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'], - }, - headers: {}, - } - - const response = (await axios(config)).data - return response.profile - } catch (err) { - throw new Error(err) + const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile` + const config = { + method: 'get', + url, + params: { + email, + with_emails: true, + api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'], + }, + headers: {}, } + + const response = (await axios(config)).data + return response?.profile || null } } diff --git a/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts index d89bb5d470..bb3791a480 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts @@ -36,7 +36,7 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr super(log) } - isEnrichableBySource(input: IEnrichmentSourceInput): boolean { + async isEnrichableBySource(input: IEnrichmentSourceInput): Promise { const displayNameSplit = input.displayName?.split(' ') return ( displayNameSplit?.length > 1 && @@ -126,7 +126,7 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr platform: PlatformType.LINKEDIN, type: MemberIdentityType.USERNAME, verified: false, - value: this.normalizeLinkedUrl(data.linkedinUrl), + value: this.normalizeLinkedUrl(data.linkedinUrl).split('/').pop(), }, ], } diff --git a/services/apps/premium/members_enrichment_worker/src/types.ts b/services/apps/premium/members_enrichment_worker/src/types.ts index 4bffd0555e..4229dea19e 100644 --- a/services/apps/premium/members_enrichment_worker/src/types.ts +++ b/services/apps/premium/members_enrichment_worker/src/types.ts @@ -10,10 +10,12 @@ import { } from '@crowd/types' import { IMemberEnrichmentDataClearbit } from './sources/clearbit/types' +import { IMemberEnrichmentDataProgAILinkedinScraper } from './sources/progai-linkedin-scraper/types' import { IMemberEnrichmentDataProgAI } from './sources/progai/types' import { IMemberEnrichmentDataSerp } from './sources/serp/types' export interface IEnrichmentSourceInput { + memberId: string github?: IMemberIdentity linkedin?: IMemberIdentity email?: IMemberIdentity @@ -27,6 +29,7 @@ export type IMemberEnrichmentData = | IMemberEnrichmentDataProgAI | IMemberEnrichmentDataClearbit | IMemberEnrichmentDataSerp + | IMemberEnrichmentDataProgAILinkedinScraper[] export interface IEnrichmentService { source: MemberEnrichmentSource @@ -35,7 +38,7 @@ export interface IEnrichmentService { cacheObsoleteAfterSeconds: number // can the source enrich using this input - isEnrichableBySource(input: IEnrichmentSourceInput): boolean + isEnrichableBySource(input: IEnrichmentSourceInput): Promise // does the source have credits to enrich members, if returned false the source will be skipped // response will be saved to redis for 60 seconds and will be used for subsequent calls @@ -49,7 +52,9 @@ export interface IEnrichmentService { // should either return the data or null if it's a miss getData(input: IEnrichmentSourceInput): Promise - normalize(data: IMemberEnrichmentData): IMemberEnrichmentDataNormalized + normalize( + data: IMemberEnrichmentData, + ): IMemberEnrichmentDataNormalized | IMemberEnrichmentDataNormalized[] } export interface IMemberEnrichmentDataNormalized { @@ -58,6 +63,7 @@ export interface IMemberEnrichmentDataNormalized { attributes?: IAttributes memberOrganizations?: IMemberEnrichmentDataNormalizedOrganization[] displayName?: string + metadata?: Record } export interface IMemberEnrichmentDataNormalizedOrganization { diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts index de65524a62..5295043c7c 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts @@ -43,6 +43,7 @@ export async function enrichMember( // cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds if (await isCacheObsolete(source, cache)) { const enrichmentInput: IEnrichmentSourceInput = { + memberId: input.id, email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL), linkedin: input.identities.find( (i) => diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts index fc68598cac..5b4a639e65 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts @@ -14,7 +14,7 @@ import { IGetMembersForEnrichmentArgs } from '../types' import { enrichMember } from './enrichMember' const { getEnrichableMembers } = proxyActivities({ - startToCloseTimeout: '10 seconds', + startToCloseTimeout: '2 minutes', }) export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Promise { @@ -24,6 +24,7 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr MemberEnrichmentSource.PROGAI, MemberEnrichmentSource.CLEARBIT, MemberEnrichmentSource.SERP, + MemberEnrichmentSource.PROGAI_LINKEDIN_SCRAPER, ] const members = await getEnrichableMembers(MEMBER_ENRICHMENT_PER_RUN, sources, afterCursor) diff --git a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts index 1f7d8b6915..ec0df7b091 100644 --- a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts @@ -531,3 +531,20 @@ export async function findMemberEnrichmentCacheDb( return result ?? null } + +export async function findMemberEnrichmentCacheForAllSourcesDb( + tx: DbConnOrTx, + memberId: string, +): Promise[]> { + const result = await tx.manyOrNone( + ` + select * + from "memberEnrichmentCache" + where + "memberId" = $(memberId) and data is not null; + `, + { memberId }, + ) + + return result ?? [] +} diff --git a/services/libs/types/src/enums/enrichment.ts b/services/libs/types/src/enums/enrichment.ts index 6c9ecc7d28..b3c939d67d 100644 --- a/services/libs/types/src/enums/enrichment.ts +++ b/services/libs/types/src/enums/enrichment.ts @@ -2,4 +2,5 @@ export enum MemberEnrichmentSource { PROGAI = 'progai', CLEARBIT = 'clearbit', SERP = 'serp', + PROGAI_LINKEDIN_SCRAPER = 'progai-linkedin-scraper', }