Skip to content
Merged

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
isCacheObsolete,
isEnrichableBySource,
normalizeEnrichmentData,
refreshMemberEnrichmentMaterializedView,
touchMemberEnrichmentCacheUpdatedAt,
updateMemberEnrichmentCache,
} from './activities/enrichment'
Expand Down Expand Up @@ -52,4 +53,5 @@ export {
updateMemberEnrichmentCache,
isEnrichableBySource,
findMemberIdentityWithTheMostActivityInPlatform,
refreshMemberEnrichmentMaterializedView,
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
touchMemberEnrichmentCacheUpdatedAtDb,
updateMemberEnrichmentCacheDb,
} from '@crowd/data-access-layer/src/old/apps/premium/members_enrichment_worker'
import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
import { RedisCache } from '@crowd/redis'
import {
IEnrichableMemberIdentityActivityAggregate,
Expand Down Expand Up @@ -136,3 +137,7 @@ export async function findMemberIdentityWithTheMostActivityInPlatform(
): Promise<IEnrichableMemberIdentityActivityAggregate> {
return findMemberIdentityWithTheMostActivityInPlatformQuestDb(svc.questdbSQL, memberId, platform)
}

export async function refreshMemberEnrichmentMaterializedView(mvName: string): Promise<void> {
await refreshMaterializedView(svc.postgres.writer.connection(), mvName)
}
7 changes: 6 additions & 1 deletion services/apps/premium/members_enrichment_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { Config } from '@crowd/archetype-standard'
import { Options, ServiceWorker } from '@crowd/archetype-worker'
import { Edition } from '@crowd/types'

import { scheduleMembersEnrichment, scheduleMembersLFIDEnrichment } from './schedules'
import {
scheduleMembersEnrichment,
scheduleMembersLFIDEnrichment,
scheduleRefreshMembersEnrichmentMaterializedViews,
} from './schedules'

const config: Config = {
envvars: [
Expand Down Expand Up @@ -43,6 +47,7 @@ export const svc = new ServiceWorker(config, options)
setImmediate(async () => {
await svc.init()

await scheduleRefreshMembersEnrichmentMaterializedViews()
await scheduleMembersEnrichment()

if (process.env['CROWD_EDITION'] === Edition.LFX) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {
scheduleMembersEnrichment,
scheduleMembersLFIDEnrichment,
scheduleRefreshMembersEnrichmentMaterializedViews,
} from './schedules/getMembersToEnrich'

export { scheduleMembersEnrichment, scheduleMembersLFIDEnrichment }
export {
scheduleMembersEnrichment,
scheduleMembersLFIDEnrichment,
scheduleRefreshMembersEnrichmentMaterializedViews,
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien
import { IS_DEV_ENV, IS_TEST_ENV } from '@crowd/common'

import { svc } from '../main'
import { getMembersForLFIDEnrichment, getMembersToEnrich } from '../workflows'
import {
getMembersForLFIDEnrichment,
getMembersToEnrich,
refreshMemberEnrichmentMaterializedViews,
} from '../workflows'

export const scheduleMembersEnrichment = async () => {
try {
Expand Down Expand Up @@ -42,6 +46,38 @@ export const scheduleMembersEnrichment = async () => {
}
}

export const scheduleRefreshMembersEnrichmentMaterializedViews = async () => {
try {
await svc.temporal.schedule.create({
scheduleId: 'refresh-members-enrichment-materialized-views',
spec: {
cronExpressions: ['0 5 * * *'],
},
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
catchupWindow: '1 minute',
},
action: {
type: 'startWorkflow',
workflowType: refreshMemberEnrichmentMaterializedViews,
taskQueue: 'members-enrichment',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
maximumAttempts: 3,
},
},
})
} catch (err) {
if (err instanceof ScheduleAlreadyRunning) {
svc.log.info('Schedule already registered in Temporal.')
svc.log.info('Configuration may have changed since. Please make sure they are in sync.')
} else {
throw new Error(err)
}
}
}

export const scheduleMembersLFIDEnrichment = async () => {
try {
await svc.temporal.schedule.create({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn
public platform = `enrichment-${this.source}`
public enrichMembersWithActivityMoreThan = 10

public enrichableBySql = `"activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified`
public enrichableBySql = `"membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan} AND mi.type = 'email' and mi.verified`

// bust cache after 120 days
public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 120
Expand Down Expand Up @@ -172,7 +172,9 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn
if (data.linkedin?.handle) {
normalized = normalizeSocialIdentity(
{
handle: data.linkedin.handle.split('/').pop(),
handle: data.linkedin.handle.endsWith('/')
? data.linkedin.handle.slice(0, -1).split('/').pop()
: data.linkedin.handle.split('/').pop(),
platform: PlatformType.LINKEDIN,
},
MemberIdentityType.USERNAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE

public enrichMembersWithActivityMoreThan = 1000

public enrichableBySql = `("activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan}) AND mi.verified AND mi.type = 'username' and mi.platform = 'linkedin'`
public enrichableBySql = `("membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan}) AND mi.verified AND mi.type = 'username' and mi.platform = 'linkedin'`

public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

Expand Down Expand Up @@ -216,6 +216,7 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE
if (!linkedinUrlHashmap.get(input.linkedin.value)) {
consumableIdentities.push({
...input.linkedin,
value: input.linkedin.value.replace(/\//g, ''),
repeatedTimesInDifferentSources: 1,
isFromVerifiedSource: true,
})
Expand Down Expand Up @@ -279,7 +280,15 @@ export default class EnrichmentServiceCrustdata extends LoggerBase implements IE
}

if (data.email) {
for (const email of data.email.split(',').filter(isEmail)) {
let emails: string[]

if (Array.isArray(data.email)) {
emails = data.email
} else {
emails = data.email.split(',').filter(isEmail)
}

for (const email of emails) {
normalized.identities.push({
type: MemberIdentityType.EMAIL,
platform: this.platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export interface IMemberEnrichmentDataCrustdata {
linkedin_profile_url: string
linkedin_flagship_url: string
name: string
email: string
email: string | string[]
title: string
last_updated: string
headline: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export default class EnrichmentServiceProgAILinkedinScraper
if (!linkedinUrlHashmap.get(input.linkedin.value)) {
consumableIdentities.push({
...input.linkedin,
value: input.linkedin.value.replace(/\//g, ''),
repeatedTimesInDifferentSources: 1,
isFromVerifiedSource: true,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr
public enrichMembersWithActivityMoreThan = 500

public enrichableBySql = `
("activitySummary".total_count > ${this.enrichMembersWithActivityMoreThan}) AND
("membersGlobalActivityCount".total_count > ${this.enrichMembersWithActivityMoreThan}) AND
(members."displayName" like '% %') AND
(members.attributes->'location'->>'default' is not null and members.attributes->'location'->>'default' <> '') AND
((members.attributes->'websiteUrl'->>'default' is not null and members.attributes->'websiteUrl'->>'default' <> '') OR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface IEnrichmentService {
// SQL filter to get enrichable members for a source
// members table is available as "members" alias
// memberIdentities table is available as "mi" alias
// activity count is available in "activitySummary" alias, "activitySummary".total_count field
// activity count is available in "membersGlobalActivityCount" alias, "membersGlobalActivityCount".total_count field
enrichableBySql: string

// should either return the data or null if it's a miss
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,11 @@ export function normalizeAttributes(

return normalized
}

export function chunkArray<T>(array: T[], chunkSize: number): T[][] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have partition in this file that does the same thing I believe.

const chunks = []
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize))
}
return chunks
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { enrichMemberWithLFAuth0 } from './workflows/lf-auth0/enrichMemberWithLF
import { findAndSaveGithubSourceIds } from './workflows/lf-auth0/findAndSaveGithubSourceIds'
import { getEnrichmentData } from './workflows/lf-auth0/getEnrichmentData'
import { getMembersForLFIDEnrichment } from './workflows/lf-auth0/getMembersForLFIDEnrichment'
import { refreshMemberEnrichmentMaterializedViews } from './workflows/refreshMemberEnrichmentMaterializedViews'

export {
getMembersToEnrich,
Expand All @@ -12,4 +13,5 @@ export {
enrichMemberWithLFAuth0,
findAndSaveGithubSourceIds,
getEnrichmentData,
refreshMemberEnrichmentMaterializedViews,
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const {
normalizeEnrichmentData,
findMemberIdentityWithTheMostActivityInPlatform,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '20 seconds',
startToCloseTimeout: '1 minute',
retry: {
initialInterval: '5s',
backoffCoefficient: 2.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {
proxyActivities,
} from '@temporalio/workflow'

import { MemberEnrichmentSource } from '@crowd/types'
import { IEnrichableMember, MemberEnrichmentSource } from '@crowd/types'

import * as activities from '../activities/getMembers'
import { IGetMembersForEnrichmentArgs } from '../types'
import { chunkArray } from '../utils/common'

import { enrichMember } from './enrichMember'

Expand All @@ -18,7 +19,8 @@ const { getEnrichableMembers } = proxyActivities<typeof activities>({
})

export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Promise<void> {
const MEMBER_ENRICHMENT_PER_RUN = 500
const QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN = 1000
const PARALLEL_ENRICHMENT_WORKFLOWS = 5
const afterCursor = args?.afterCursor || null
const sources = [
MemberEnrichmentSource.PROGAI,
Expand All @@ -28,37 +30,45 @@ export async function getMembersToEnrich(args: IGetMembersForEnrichmentArgs): Pr
MemberEnrichmentSource.CRUSTDATA,
]

const members = await getEnrichableMembers(MEMBER_ENRICHMENT_PER_RUN, sources, afterCursor)
const members = await getEnrichableMembers(
QUERY_FOR_ENRICHABLE_MEMBERS_PER_RUN,
sources,
afterCursor,
)

if (members.length === 0) {
return
}

await Promise.all(
members.map((member) => {
return executeChild(enrichMember, {
workflowId: 'member-enrichment/' + member.tenantId + '/' + member.id,
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
workflowExecutionTimeout: '15 minutes',
retry: {
backoffCoefficient: 2,
maximumAttempts: 10,
initialInterval: 2 * 1000,
maximumInterval: 30 * 1000,
},
args: [member, sources],
searchAttributes: {
TenantId: [member.tenantId],
},
})
}),
)
const chunks = chunkArray<IEnrichableMember>(members, PARALLEL_ENRICHMENT_WORKFLOWS)

for (const chunk of chunks) {
await Promise.all(
chunk.map((member) => {
return executeChild(enrichMember, {
workflowId: 'member-enrichment/' + member.tenantId + '/' + member.id,
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
workflowExecutionTimeout: '15 minutes',
retry: {
backoffCoefficient: 2,
maximumAttempts: 10,
initialInterval: 2 * 1000,
maximumInterval: 30 * 1000,
},
args: [member, sources],
searchAttributes: {
TenantId: [member.tenantId],
},
})
}),
)
}

await continueAsNew<typeof getMembersToEnrich>({
afterCursor: {
memberId: members[members.length - 1].id,
activityCount: members[members.length - 1].activityCount,
memberId: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].id,
activityCount: chunks[chunks.length - 1][chunks[chunks.length - 1].length - 1].activityCount,
},
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { proxyActivities } from '@temporalio/workflow'

import { MemberEnrichmentMaterializedView } from '@crowd/types'

import * as activities from '../activities/enrichment'

const { refreshMemberEnrichmentMaterializedView } = proxyActivities<typeof activities>({
startToCloseTimeout: '10 minutes',
})

export async function refreshMemberEnrichmentMaterializedViews(): Promise<void> {
for (const mv of Object.values(MemberEnrichmentMaterializedView)) {
await refreshMemberEnrichmentMaterializedView(mv)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export async function fetchMembersForEnrichment(
afterCursor: { activityCount: number; memberId: string } | null,
): Promise<IEnrichableMember[]> {
const cursorFilter = afterCursor
? `AND ((coalesce("activitySummary".total_count, 0) < $2) OR (coalesce("activitySummary".total_count, 0) = $2 AND members.id < $3))`
? `AND ((coalesce("membersGlobalActivityCount".total_count, 0) < $2) OR (coalesce("membersGlobalActivityCount".total_count, 0) = $2 AND members.id < $3))`
: ''

const sourceInnerQueryItems = []
Expand Down Expand Up @@ -47,18 +47,6 @@ export async function fetchMembersForEnrichment(

return db.connection().query(
`
WITH "activitySummary" AS (
SELECT
msa."memberId",
SUM(msa."activityCount") AS total_count
FROM "memberSegmentsAgg" msa
WHERE msa."segmentId" IN (
SELECT id
FROM segments
WHERE "grandparentId" IS NOT NULL AND "parentId" IS NOT NULL
)
GROUP BY msa."memberId"
)
SELECT
members."id",
members."tenantId",
Expand All @@ -73,11 +61,11 @@ export async function fetchMembersForEnrichment(
'verified', mi.verified
)
) AS identities,
MAX(coalesce("activitySummary".total_count, 0)) AS "activityCount"
MAX(coalesce("membersGlobalActivityCount".total_count, 0)) AS "activityCount"
FROM members
INNER JOIN tenants ON tenants.id = members."tenantId"
INNER JOIN "memberIdentities" mi ON mi."memberId" = members.id
LEFT JOIN "activitySummary" ON "activitySummary"."memberId" = members.id
LEFT JOIN "membersGlobalActivityCount" ON "membersGlobalActivityCount"."memberId" = members.id
WHERE
${enrichableBySqlJoined}
AND tenants."deletedAt" IS NULL
Expand Down
Loading