Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { findMemberIdentityWithTheMostActivityInPlatform as findMemberIdentityWithTheMostActivityInPlatformQuestDb } from '@crowd/data-access-layer/src/activities'
import {
findMemberEnrichmentCacheDb,
findMemberEnrichmentCacheForAllSourcesDb,
insertMemberEnrichmentCacheDb,
touchMemberEnrichmentCacheUpdatedAtDb,
updateMemberEnrichmentCacheDb,
Expand Down Expand Up @@ -42,7 +43,7 @@ export async function getEnrichmentData(
export async function normalizeEnrichmentData(
source: MemberEnrichmentSource,
data: IMemberEnrichmentData,
): Promise<IMemberEnrichmentDataNormalized> {
): Promise<IMemberEnrichmentDataNormalized | IMemberEnrichmentDataNormalized[]> {
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(source, svc.log)
return service.normalize(data)
}
Expand Down Expand Up @@ -100,6 +101,12 @@ export async function findMemberEnrichmentCache(
return findMemberEnrichmentCacheDb(svc.postgres.reader.connection(), memberId, source)
}

export async function findMemberEnrichmentCacheForAllSources(
memberId: string,
): Promise<IMemberEnrichmentCache<IMemberEnrichmentData>[]> {
return findMemberEnrichmentCacheForAllSourcesDb(svc.postgres.reader.connection(), memberId)
}

export async function insertMemberEnrichmentCache(
source: MemberEnrichmentSource,
memberId: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Logger } from '@crowd/logging'
import { MemberEnrichmentSource } from '@crowd/types'

import EnrichmentServiceClearbit from './sources/clearbit/service'
import EnrichmentServiceProgAILinkedinScraper from './sources/progai-linkedin-scraper/service'
import EnrichmentServiceProgAI from './sources/progai/service'
import EnrichmentServiceSerpApi from './sources/serp/service'
import { IEnrichmentService } from './types'
Expand All @@ -24,6 +25,8 @@ export class EnrichmentSourceServiceFactory {
return new EnrichmentServiceClearbit(log)
case MemberEnrichmentSource.SERP:
return new EnrichmentServiceSerpApi(log)
case MemberEnrichmentSource.PROGAI_LINKEDIN_SCRAPER:
return new EnrichmentServiceProgAILinkedinScraper(log)
default:
throw new Error(`Enrichment service for ${source} is not found!`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export default class EnrichmentServiceClearbit extends LoggerBase implements IEn
super(log)
}

isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
return !!input.email?.value && input.email?.verified
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import axios from 'axios'

import { Logger, LoggerBase } from '@crowd/logging'
import {
IMemberEnrichmentCache,
IMemberIdentity,
MemberEnrichmentSource,
PlatformType,
} from '@crowd/types'

import { findMemberEnrichmentCacheForAllSources } from '../../activities/enrichment'
import { EnrichmentSourceServiceFactory } from '../../factory'
import {
IEnrichmentService,
IEnrichmentSourceInput,
IMemberEnrichmentData,
IMemberEnrichmentDataNormalized,
} from '../../types'
import { IMemberEnrichmentDataProgAI, IMemberEnrichmentDataProgAIResponse } from '../progai/types'

import { IMemberEnrichmentDataProgAILinkedinScraper } from './types'

export default class EnrichmentServiceProgAILinkedinScraper
extends LoggerBase
implements IEnrichmentService
{
public source: MemberEnrichmentSource = MemberEnrichmentSource.PROGAI_LINKEDIN_SCRAPER
public platform = `enrichment-${this.source}`

public alsoFindInputsInSourceCaches: MemberEnrichmentSource[] = [
MemberEnrichmentSource.PROGAI,
MemberEnrichmentSource.CLEARBIT,
MemberEnrichmentSource.SERP,
]

public enrichableBySql = `(mi.verified AND mi.type = 'username' and mi.platform = 'linkedin')`

public cacheObsoleteAfterSeconds = 60 * 60 * 24 * 90

constructor(public readonly log: Logger) {
super(log)
}

// in addition to members with linkedin identity we'll also use existing cache rows from other sources (serp and clearbit)
async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
const caches = await findMemberEnrichmentCacheForAllSources(input.memberId)

let hasEnrichableLinkedinInCache = false
for (const cache of caches) {
if (this.alsoFindInputsInSourceCaches.includes(cache.source)) {
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
cache.source,
this.log,
)
const normalized = service.normalize(cache.data) as IMemberEnrichmentDataNormalized
if (normalized.identities.some((i) => i.platform === PlatformType.LINKEDIN)) {
hasEnrichableLinkedinInCache = true
break
}

break
}
}

return (
hasEnrichableLinkedinInCache ||
(input.linkedin && input.linkedin.value && input.linkedin.verified)
)
}

async hasRemainingCredits(): Promise<boolean> {
return true
}
Comment on lines +71 to +73
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement actual credit checking.

The method currently always returns true without verifying available credits. This could lead to issues if the ProgAI service has rate limits or credit restrictions.

Consider implementing actual credit checking by:

  1. Checking remaining credits from ProgAI API
  2. Maintaining a local counter for rate limiting
  3. Implementing proper error handling for when credits are exhausted

Would you like me to help implement this functionality?


private async findConsumableLinkedinIdentities(
input: IEnrichmentSourceInput,
caches: IMemberEnrichmentCache<IMemberEnrichmentData>[],
): Promise<
(IMemberIdentity & { repeatedTimesInDifferentSources: number; isFromVerifiedSource: boolean })[]
> {
const consumableIdentities: (IMemberIdentity & {
repeatedTimesInDifferentSources: number
isFromVerifiedSource: boolean
})[] = []
const linkedinUrlHashmap = new Map<string, number>()

for (const cache of caches) {
if (this.alsoFindInputsInSourceCaches.includes(cache.source)) {
const service = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
cache.source,
this.log,
)
const normalized = service.normalize(cache.data) as IMemberEnrichmentDataNormalized
if (normalized.identities.some((i) => i.platform === PlatformType.LINKEDIN)) {
const identity = normalized.identities.find((i) => i.platform === PlatformType.LINKEDIN)
if (!linkedinUrlHashmap.get(identity.value)) {
consumableIdentities.push({
...identity,
repeatedTimesInDifferentSources: 1,
isFromVerifiedSource: false,
})
linkedinUrlHashmap.set(identity.value, 1)
} else {
const repeatedTimesInDifferentSources = linkedinUrlHashmap.get(identity.value) + 1
linkedinUrlHashmap.set(identity.value, repeatedTimesInDifferentSources)
consumableIdentities.find(
(i) => i.value === identity.value,
).repeatedTimesInDifferentSources = repeatedTimesInDifferentSources
}
}
}
}

// also add the linkedin identity from the input
if (input.linkedin && input.linkedin.value && input.linkedin.verified) {
if (!linkedinUrlHashmap.get(input.linkedin.value)) {
consumableIdentities.push({
...input.linkedin,
repeatedTimesInDifferentSources: 1,
isFromVerifiedSource: true,
})
} else {
const repeatedTimesInDifferentSources = linkedinUrlHashmap.get(input.linkedin.value) + 1
const identityFound = consumableIdentities.find((i) => i.value === input.linkedin.value)

identityFound.repeatedTimesInDifferentSources = repeatedTimesInDifferentSources
identityFound.isFromVerifiedSource = true
}
}
return consumableIdentities
}

async getData(
input: IEnrichmentSourceInput,
): Promise<IMemberEnrichmentDataProgAILinkedinScraper[] | null> {
const profiles: IMemberEnrichmentDataProgAILinkedinScraper[] = []
const caches = await findMemberEnrichmentCacheForAllSources(input.memberId)

const consumableIdentities = await this.findConsumableLinkedinIdentities(input, caches)

for (const identity of consumableIdentities) {
const data = await this.getDataUsingLinkedinHandle(identity.value)
if (data) {
const existingProgaiCache = caches.find((c) => c.source === MemberEnrichmentSource.PROGAI)
// we don't want to reinforce the cache with the same data, only save to cache
// if a new profile is returned from progai
if (
existingProgaiCache &&
existingProgaiCache.data &&
(existingProgaiCache.data as IMemberEnrichmentDataProgAI).id == data.id
) {
continue
}
profiles.push({
...data,
metadata: {
repeatedTimesInDifferentSources: identity.repeatedTimesInDifferentSources,
isFromVerifiedSource: identity.isFromVerifiedSource,
},
})
}
}

return profiles.length > 0 ? profiles : null
}

private async getDataUsingLinkedinHandle(handle: string): Promise<IMemberEnrichmentDataProgAI> {
const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
const config = {
method: 'get',
url,
params: {
linkedin_url: `https://linkedin.com/in/${handle}`,
with_emails: true,
api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
},
headers: {},
}

const response: IMemberEnrichmentDataProgAIResponse = (await axios(config)).data

return response?.profile || null
}

normalize(
profiles: IMemberEnrichmentDataProgAILinkedinScraper[],
): IMemberEnrichmentDataNormalized[] {
const normalizedProfiles: IMemberEnrichmentDataNormalized[] = []
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
MemberEnrichmentSource.PROGAI,
this.log,
)

for (const profile of profiles) {
const normalized = progaiService.normalize(profile) as IMemberEnrichmentDataNormalized
normalizedProfiles.push({ ...normalized, metadata: profile.metadata })
}

return normalizedProfiles.length > 0 ? normalizedProfiles : null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { IMemberEnrichmentDataProgAI } from '../progai/types'

export interface IMemberEnrichmentDataProgAILinkedinScraper extends IMemberEnrichmentDataProgAI {
metadata: {
repeatedTimesInDifferentSources: number
isFromVerifiedSource: boolean
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri
super(log)
}

isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
const enrichableUsingGithubHandle = !!input.github?.value
const enrichableUsingEmail = this.alsoUseEmailIdentitiesForEnrichment && !!input.email?.value
return enrichableUsingGithubHandle || enrichableUsingEmail
Expand Down Expand Up @@ -309,47 +309,36 @@ export default class EnrichmentServiceProgAI extends LoggerBase implements IEnri
}

async getDataUsingGitHubHandle(githubUsername: string): Promise<IMemberEnrichmentDataProgAI> {
let response: IMemberEnrichmentDataProgAIResponse

try {
const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
const config = {
method: 'get',
url,
params: {
github_handle: githubUsername,
with_emails: true,
api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
},
headers: {},
}

response = (await axios(config)).data
} catch (err) {
throw new Error(err)
const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
const config = {
method: 'get',
url,
params: {
github_handle: githubUsername,
with_emails: true,
api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
},
headers: {},
}

return response.profile
const response: IMemberEnrichmentDataProgAIResponse = (await axios(config)).data
return response?.profile || null
Comment on lines +312 to +325
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling and request timeout.

The HTTP request lacks error handling and timeout configuration, which could lead to unhandled exceptions or hanging requests.

Consider applying these improvements:

   async getDataUsingGitHubHandle(githubUsername: string): Promise<IMemberEnrichmentDataProgAI> {
     const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
     const config = {
       method: 'get',
       url,
       params: {
         github_handle: githubUsername,
         with_emails: true,
         api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
       },
-      headers: {},
+      headers: {},
+      timeout: 5000, // 5 seconds timeout
     }
 
-    const response: IMemberEnrichmentDataProgAIResponse = (await axios(config)).data
-    return response?.profile || null
+    try {
+      const response = await axios<IMemberEnrichmentDataProgAIResponse>(config)
+      return response.data?.profile || null
+    } catch (error) {
+      this.log.error('Failed to fetch GitHub profile from ProgAI', {
+        error,
+        githubUsername,
+      })
+      return null
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
const config = {
method: 'get',
url,
params: {
github_handle: githubUsername,
with_emails: true,
api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
},
headers: {},
}
return response.profile
const response: IMemberEnrichmentDataProgAIResponse = (await axios(config)).data
return response?.profile || null
const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
const config = {
method: 'get',
url,
params: {
github_handle: githubUsername,
with_emails: true,
api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
},
headers: {},
timeout: 5000, // 5 seconds timeout
}
try {
const response = await axios<IMemberEnrichmentDataProgAIResponse>(config)
return response.data?.profile || null
} catch (error) {
this.log.error('Failed to fetch GitHub profile from ProgAI', {
error,
githubUsername,
})
return null
}

}

async getDataUsingEmailAddress(email: string): Promise<IMemberEnrichmentDataProgAI> {
try {
const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
const config = {
method: 'get',
url,
params: {
email,
with_emails: true,
api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
},
headers: {},
}

const response = (await axios(config)).data
return response.profile
} catch (err) {
throw new Error(err)
const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
const config = {
method: 'get',
url,
params: {
email,
with_emails: true,
api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
},
headers: {},
}

const response = (await axios(config)).data
return response?.profile || null
Comment on lines +329 to +342
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Add error handling and reduce code duplication.

The method has similar issues as getDataUsingGitHubHandle and contains duplicated HTTP request logic.

  1. Add error handling and timeout:
   async getDataUsingEmailAddress(email: string): Promise<IMemberEnrichmentDataProgAI> {
     const url = `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`
     const config = {
       method: 'get',
       url,
       params: {
         email,
         with_emails: true,
         api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
       },
-      headers: {},
+      headers: {},
+      timeout: 5000, // 5 seconds timeout
     }
 
-    const response = (await axios(config)).data
-    return response?.profile || null
+    try {
+      const response = await axios<IMemberEnrichmentDataProgAIResponse>(config)
+      return response.data?.profile || null
+    } catch (error) {
+      this.log.error('Failed to fetch email profile from ProgAI', {
+        error,
+        email,
+      })
+      return null
+    }
   }
  1. Consider extracting the common HTTP request logic:
private async fetchProfile(params: Record<string, string>): Promise<IMemberEnrichmentDataProgAI> {
  const config = {
    method: 'get',
    url: `${process.env['CROWD_ENRICHMENT_PROGAI_URL']}/get_profile`,
    params: {
      ...params,
      with_emails: true,
      api_key: process.env['CROWD_ENRICHMENT_PROGAI_API_KEY'],
    },
    headers: {},
    timeout: 5000,
  }

  try {
    const response = await axios<IMemberEnrichmentDataProgAIResponse>(config)
    return response.data?.profile || null
  } catch (error) {
    this.log.error('Failed to fetch profile from ProgAI', {
      error,
      params,
    })
    return null
  }
}

Then update both methods to use it:

async getDataUsingGitHubHandle(githubUsername: string): Promise<IMemberEnrichmentDataProgAI> {
  return this.fetchProfile({ github_handle: githubUsername })
}

async getDataUsingEmailAddress(email: string): Promise<IMemberEnrichmentDataProgAI> {
  return this.fetchProfile({ email })
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr
super(log)
}

isEnrichableBySource(input: IEnrichmentSourceInput): boolean {
async isEnrichableBySource(input: IEnrichmentSourceInput): Promise<boolean> {
const displayNameSplit = input.displayName?.split(' ')
return (
displayNameSplit?.length > 1 &&
Expand Down Expand Up @@ -126,7 +126,7 @@ export default class EnrichmentServiceSerpApi extends LoggerBase implements IEnr
platform: PlatformType.LINKEDIN,
type: MemberIdentityType.USERNAME,
verified: false,
value: this.normalizeLinkedUrl(data.linkedinUrl),
value: this.normalizeLinkedUrl(data.linkedinUrl).split('/').pop(),
},
],
}
Expand Down
Loading