diff --git a/backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql b/backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql index d9588ea398..68d35392b1 100644 --- a/backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql +++ b/backend/src/database/migrations/R__memberEnrichmentMaterializedViews.sql @@ -314,7 +314,8 @@ from unique_members), select distinct mi."memberId" from "memberIdentities" mi left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" - where mi.verified + where "membersGlobalActivityCount".total_count > 1000 + and mi.verified and mi.platform = 'linkedin' group by mi."memberId"), unique_members as ( @@ -359,7 +360,8 @@ from unique_members), select distinct mi."memberId" from "memberIdentities" mi left join "membersGlobalActivityCount" on "membersGlobalActivityCount"."memberId" = mi."memberId" - where mi.verified + where "membersGlobalActivityCount".total_count > 1000 + and mi.verified and mi.platform = 'linkedin' group by mi."memberId"), unique_members as ( diff --git a/services/apps/premium/members_enrichment_worker/src/activities.ts b/services/apps/premium/members_enrichment_worker/src/activities.ts index addb2befb5..0805b9a6a7 100644 --- a/services/apps/premium/members_enrichment_worker/src/activities.ts +++ b/services/apps/premium/members_enrichment_worker/src/activities.ts @@ -2,6 +2,8 @@ import { findMemberEnrichmentCache, findMemberIdentityWithTheMostActivityInPlatform, getEnrichmentData, + getEnrichmentInput, + getObsoleteSourcesOfMember, insertMemberEnrichmentCache, isCacheObsolete, isEnrichableBySource, @@ -10,7 +12,7 @@ import { touchMemberEnrichmentCacheUpdatedAt, updateMemberEnrichmentCache, } from './activities/enrichment' -import { getEnrichableMembers } from './activities/getMembers' +import { getEnrichableMembers, getMaxConcurrentRequests } from './activities/getMembers' import { refreshToken } from './activities/lf-auth0/authenticateLFAuth0' import { getIdentitiesExistInOtherMembers, @@ -54,4 +56,7 @@ export { isEnrichableBySource, findMemberIdentityWithTheMostActivityInPlatform, refreshMemberEnrichmentMaterializedView, + getEnrichmentInput, + getMaxConcurrentRequests, + getObsoleteSourcesOfMember, } diff --git a/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts index d19031f52b..fc59fead34 100644 --- a/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/premium/members_enrichment_worker/src/activities/enrichment.ts @@ -9,9 +9,12 @@ import { import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils' import { RedisCache } from '@crowd/redis' import { + IEnrichableMember, IEnrichableMemberIdentityActivityAggregate, IMemberEnrichmentCache, MemberEnrichmentSource, + MemberIdentityType, + PlatformType, } from '@crowd/types' import { EnrichmentSourceServiceFactory } from '../factory' @@ -41,6 +44,51 @@ export async function getEnrichmentData( return null } +export async function getEnrichmentInput( + input: IEnrichableMember, +): Promise { + const enrichmentInput: IEnrichmentSourceInput = { + memberId: input.id, + email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL), + linkedin: input.identities.find( + (i) => + i.verified && + i.platform === PlatformType.LINKEDIN && + i.type === MemberIdentityType.USERNAME, + ), + displayName: input.displayName || undefined, + website: input.website || undefined, + location: input.location || undefined, + activityCount: input.activityCount || 0, + } + + // there can be multiple verified identities in github, we select the one with the most activities + const verifiedGithubIdentities = input.identities.filter( + (i) => + i.verified && i.platform === PlatformType.GITHUB && i.type === MemberIdentityType.USERNAME, + ) + + if (verifiedGithubIdentities.length > 1) { + const ghIdentityWithTheMostActivities = await findMemberIdentityWithTheMostActivityInPlatform( + input.id, + PlatformType.GITHUB, + ) + if (ghIdentityWithTheMostActivities) { + enrichmentInput.github = input.identities.find( + (i) => + i.verified && + i.platform === PlatformType.GITHUB && + i.type === MemberIdentityType.USERNAME && + i.value === ghIdentityWithTheMostActivities.username, + ) + } + } else { + enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined + } + + return enrichmentInput +} + export async function normalizeEnrichmentData( source: MemberEnrichmentSource, data: IMemberEnrichmentData, @@ -53,6 +101,13 @@ export async function isCacheObsolete( source: MemberEnrichmentSource, cache: IMemberEnrichmentCache, ): Promise { + return isCacheObsoleteSync(source, cache) +} + +export function isCacheObsoleteSync( + source: MemberEnrichmentSource, + cache: IMemberEnrichmentCache, +): boolean { const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log) return ( !cache || @@ -104,8 +159,13 @@ export async function findMemberEnrichmentCache( export async function findMemberEnrichmentCacheForAllSources( memberId: string, + returnRowsWithoutData = false, ): Promise[]> { - return findMemberEnrichmentCacheForAllSourcesDb(svc.postgres.reader.connection(), memberId) + return findMemberEnrichmentCacheForAllSourcesDb( + svc.postgres.reader.connection(), + memberId, + returnRowsWithoutData, + ) } export async function insertMemberEnrichmentCache( @@ -138,6 +198,20 @@ export async function findMemberIdentityWithTheMostActivityInPlatform( return findMemberIdentityWithTheMostActivityInPlatformQuestDb(svc.questdbSQL, memberId, platform) } +export async function getObsoleteSourcesOfMember( + memberId: string, + possibleSources: MemberEnrichmentSource[], +): Promise { + const caches = await findMemberEnrichmentCacheForAllSources(memberId, true) + const obsoleteSources = possibleSources.filter((source) => + isCacheObsoleteSync( + source, + caches.find((c) => c.source === source), + ), + ) + return obsoleteSources +} + export async function refreshMemberEnrichmentMaterializedView(mvName: string): Promise { await refreshMaterializedView(svc.postgres.writer.connection(), mvName) } diff --git a/services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts b/services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts index 09698a6512..ebabb35501 100644 --- a/services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts +++ b/services/apps/premium/members_enrichment_worker/src/activities/getMembers.ts @@ -7,11 +7,11 @@ import { import { EnrichmentSourceServiceFactory } from '../factory' import { svc } from '../main' +import { IEnrichmentService } from '../types' export async function getEnrichableMembers( limit: number, sources: MemberEnrichmentSource[], - afterCursor: { activityCount: number; memberId: string } | null, ): Promise { let rows: IEnrichableMember[] = [] const sourceInputs: IMemberEnrichmentSourceQueryInput[] = sources.map((s) => { @@ -23,7 +23,34 @@ export async function getEnrichableMembers( } }) const db = svc.postgres.reader - rows = await fetchMembersForEnrichment(db, limit, sourceInputs, afterCursor) + rows = await fetchMembersForEnrichment(db, limit, sourceInputs) return rows } + +// Get the most strict parallelism among existing and enrichable sources +// We only check sources that has activity count cutoff in current range +export async function getMaxConcurrentRequests( + members: IEnrichableMember[], + possibleSources: MemberEnrichmentSource[], + concurrencyLimit: number, +): Promise { + const serviceMap: Partial> = {} + const currentProcessingActivityCount = members[0].activityCount + + let maxConcurrentRequestsInAllSources = concurrencyLimit + + for (const source of possibleSources) { + serviceMap[source] = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log) + const activityCountCutoff = serviceMap[source].enrichMembersWithActivityMoreThan + if (!activityCountCutoff || activityCountCutoff <= currentProcessingActivityCount) { + maxConcurrentRequestsInAllSources = Math.min( + maxConcurrentRequestsInAllSources, + serviceMap[source].maxConcurrentRequests, + ) + } + } + svc.log.info('Setting max concurrent requests', { maxConcurrentRequestsInAllSources }) + + return maxConcurrentRequestsInAllSources +} diff --git a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index 948fc69400..e8aa7629a3 100644 --- a/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -29,11 +29,7 @@ export const scheduleMembersEnrichment = async () => { backoffCoefficient: 2, maximumAttempts: 3, }, - args: [ - { - afterCursor: null, - }, - ], + args: [], }, }) } catch (err) { diff --git a/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts index 7cfbb25df6..b505cfe52f 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/clearbit/service.ts @@ -31,6 +31,8 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn public enrichableBySql = `"membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified` + public maxConcurrentRequests = 15 + // bust cache after 120 days public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120 @@ -57,7 +59,11 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn } async isEnrichableBySource(input: IEnrichmentSourceInput): Promise { - return !!input.email?.value && input.email?.verified + return ( + input.activityCount > this.enrichMembersWithActivityMoreThan && + !!input.email?.value && + input.email?.verified + ) } async getData(input: IEnrichmentSourceInput): Promise { diff --git a/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts index 80aa3abebb..ba65dddbff 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/crustdata/service.ts @@ -47,6 +47,8 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90 + public maxConcurrentRequests = 5 + public attributeSettings: IMemberEnrichmentAttributeSettings = { [MemberAttributeName.AVATAR_URL]: { fields: ['profile_picture_url'], diff --git a/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts index b0b75f9733..0e683ac2b3 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/progai-linkedin-scraper/service.ts @@ -37,6 +37,8 @@ export default class EnrichmentServiceProgAILinkedinScraper public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90 + public maxConcurrentRequests = 1000 + constructor(public readonly log: Logger) { super(log) } diff --git a/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts index 5b6dc736f4..4d5bf149a2 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/progai/service.ts @@ -33,11 +33,13 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri public source: MemberEnrichmentSource = MemberEnrichmentSource.PROGAI public platform = `enrichment-${this.source}` - enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))` + public enrichableBySql = `mi.verified and ((mi.type = 'username' AND mi.platform = 'github') OR (mi.type = 'email'))` // bust cache after 90 days public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90 + public maxConcurrentRequests = 1000 + public attributeSettings: IMemberEnrichmentAttributeSettings = { [MemberAttributeName.AVATAR_URL]: { fields: ['profile_pic_url'], diff --git a/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts b/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts index 7559480dae..6a93a08042 100644 --- a/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts +++ b/services/apps/premium/members_enrichment_worker/src/sources/serp/service.ts @@ -32,6 +32,8 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr // bust cache after 120 days public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120 + public maxConcurrentRequests = 300 + constructor(public readonly log: Logger) { super(log) } diff --git a/services/apps/premium/members_enrichment_worker/src/types.ts b/services/apps/premium/members_enrichment_worker/src/types.ts index e51bba2547..22d896c34c 100644 --- a/services/apps/premium/members_enrichment_worker/src/types.ts +++ b/services/apps/premium/members_enrichment_worker/src/types.ts @@ -41,6 +41,9 @@ export interface IEnrichmentService { // cache rows with older updatedAt than this will be considered obsolete and will be re-enriched cacheObsoleteAfterSeconds: number + // max concurrent requests that can be made to the source + maxConcurrentRequests: number + // can the source enrich using this input isEnrichableBySource(input: IEnrichmentSourceInput): Promise @@ -54,6 +57,9 @@ export interface IEnrichmentService { // activity count is available in "membersGlobalActivityCount" alias, "membersGlobalActivityCount".total_count field enrichableBySql: string + // only enrich members with activity more than this number + enrichMembersWithActivityMoreThan?: number + // should either return the data or null if it's a miss getData(input: IEnrichmentSourceInput): Promise normalize( @@ -88,10 +94,6 @@ export interface IMemberEnrichmentLinkedinScraperMetadata { isFromVerifiedSource: boolean } -export interface IGetMembersForEnrichmentArgs { - afterCursor: { activityCount: number; memberId: string } | null -} - export interface IMemberEnrichmentSocialData { platform: PlatformType handle: string diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts index 90b24dfcf1..c1ca4fc3ea 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/enrichMember.ts @@ -1,11 +1,6 @@ import { proxyActivities } from '@temporalio/workflow' -import { - IEnrichableMember, - MemberEnrichmentSource, - MemberIdentityType, - PlatformType, -} from '@crowd/types' +import { IEnrichableMember, MemberEnrichmentSource } from '@crowd/types' import * as activities from '../activities' import { IEnrichmentSourceInput } from '../types' @@ -19,7 +14,7 @@ const { updateMemberEnrichmentCache, isCacheObsolete, normalizeEnrichmentData, - findMemberIdentityWithTheMostActivityInPlatform, + getEnrichmentInput, } = proxyActivities({ startToCloseTimeout: '1 minute', retry: { @@ -42,44 +37,7 @@ export async function enrichMember( // cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds if (await isCacheObsolete(source, cache)) { - const enrichmentInput: IEnrichmentSourceInput = { - memberId: input.id, - email: input.identities.find((i) => i.verified && i.type === MemberIdentityType.EMAIL), - linkedin: input.identities.find( - (i) => - i.verified && - i.platform === PlatformType.LINKEDIN && - i.type === MemberIdentityType.USERNAME, - ), - displayName: input.displayName || undefined, - website: input.website || undefined, - location: input.location || undefined, - activityCount: input.activityCount || 0, - } - - // there can be multiple verified identities in github, we select the one with the most activities - const verifiedGithubIdentities = input.identities.filter( - (i) => - i.verified && - i.platform === PlatformType.GITHUB && - i.type === MemberIdentityType.USERNAME, - ) - - if (verifiedGithubIdentities.length > 1) { - const ghIdentityWithTheMostActivities = - await findMemberIdentityWithTheMostActivityInPlatform(input.id, PlatformType.GITHUB) - if (ghIdentityWithTheMostActivities) { - enrichmentInput.github = input.identities.find( - (i) => - i.verified && - i.platform === PlatformType.GITHUB && - i.type === MemberIdentityType.USERNAME && - i.value === ghIdentityWithTheMostActivities.username, - ) - } - } else { - enrichmentInput.github = verifiedGithubIdentities?.[0] || undefined - } + const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input) const data = await getEnrichmentData(source, enrichmentInput) diff --git a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts index b5255357d6..8e4482a250 100644 --- a/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts +++ b/services/apps/premium/members_enrichment_worker/src/workflows/getMembersToEnrich.ts @@ -9,19 +9,16 @@ import { import { IEnrichableMember, MemberEnrichmentSource } from '@crowd/types' import * as activities from '../activities/getMembers' -import { IGetMembersForEnrichmentArgs } from '../types' import { chunkArray } from '../utils/common' import { enrichMember } from './enrichMember' -const { getEnrichableMembers } = proxyActivities({ +const { getEnrichableMembers, getMaxConcurrentRequests } = proxyActivities({ startToCloseTimeout: '2 minutes', }) -export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Promise { +export async function getMembersToEnrich(): Promise { const QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN = 1000 - const PARALLEL_ENRICHMENT_WORKFLOWS = 5 - const afterCursor = args?.afterCursor || null const sources = [ MemberEnrichmentSource.PROGAI, MemberEnrichmentSource.CLEARBIT, @@ -30,17 +27,19 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr MemberEnrichmentSource.CRUSTDATA, ] - const members = await getEnrichableMembers( - QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN, - sources, - afterCursor, - ) + const members = await getEnrichableMembers(QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN, sources) if (members.length === 0) { return } - const chunks = chunkArray(members, PARALLEL_ENRICHMENT_WORKFLOWS) + const parallelEnrichmentWorkflows = await getMaxConcurrentRequests( + members, + sources, + QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN, + ) + + const chunks = chunkArray(members, parallelEnrichmentWorkflows) for (const chunk of chunks) { await Promise.all( @@ -65,10 +64,5 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr ) } - await continueAsNew({ - afterCursor: { - memberId: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].id, - activityCount: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].activityCount, - }, - }) + await continueAsNew() } diff --git a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts index 46d719e3f0..63d939c6cf 100644 --- a/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/premium/members_enrichment_worker/index.ts @@ -12,21 +12,24 @@ import { OrganizationSource, } from '@crowd/types' +/** + * Gets enrichable members using the provided sources + * If a member is enrichable in one source, and not enrichable in another, the member will be returned + * Members with at least one missing or old source cache rows will be returned + * The reason we're not checking enrichable members and cache age in the same subquery is because of linkedin scraper sources. + * These sources also use data from other sources and it's costly to check cache data jsons. + * This check is instead done in the application layer. + */ export async function fetchMembersForEnrichment( db: DbStore, limit: number, sourceInputs: IMemberEnrichmentSourceQueryInput[], - afterCursor: { activityCount: number; memberId: string } | null, ): Promise { - const cursorFilter = afterCursor - ? `AND ((coalesce("membersGlobalActivityCount".total_count, 0) < $2) OR (coalesce("membersGlobalActivityCount".total_count, 0) = $2 AND members.id < $3))` - : '' - - const sourceInnerQueryItems = [] + const cacheAgeInnerQueryItems = [] const enrichableBySqlConditions = [] sourceInputs.forEach((input) => { - sourceInnerQueryItems.push( + cacheAgeInnerQueryItems.push( ` ( NOT EXISTS ( SELECT 1 FROM "memberEnrichmentCache" mec @@ -70,13 +73,12 @@ export async function fetchMembersForEnrichment( ${enrichableBySqlJoined} AND tenants."deletedAt" IS NULL AND members."deletedAt" IS NULL - AND (${sourceInnerQueryItems.join(' OR ')}) - ${cursorFilter} + AND (${cacheAgeInnerQueryItems.join(' OR ')}) GROUP BY members.id - ORDER BY "activityCount" DESC, members.id DESC + ORDER BY "activityCount" DESC LIMIT $1; `, - afterCursor ? [limit, afterCursor.activityCount, afterCursor.memberId] : [limit], + [limit], ) } @@ -523,13 +525,15 @@ export async function findMemberEnrichmentCacheDb( export async function findMemberEnrichmentCacheForAllSourcesDb( tx: DbConnOrTx, memberId: string, + returnRowsWithoutData = false, ): Promise[]> { + const dataFilter = returnRowsWithoutData ? '' : 'and data is not null' const result = await tx.manyOrNone( ` select * from "memberEnrichmentCache" where - "memberId" = $(memberId) and data is not null; + "memberId" = $(memberId) ${dataFilter}; `, { memberId }, )