Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"script:cache-dashboard": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/cache-dashboard.ts",
"script:purge-tenants-and-data": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/purge-tenants-and-data.ts",
"script:import-lfx-memberships": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/import-lfx-memberships.ts",
"script:fix-missing-org-displayName": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-missing-org-displayName.ts"
"script:fix-missing-org-displayName": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-missing-org-displayName.ts",
"script:syncActivities": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/syncActivities.ts"
},
"dependencies": {
"@aws-sdk/client-comprehend": "^3.159.0",
Expand Down
2 changes: 2 additions & 0 deletions backend/src/bin/jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import refreshGithubRepoSettings from './refreshGithubRepoSettings'
import refreshGitlabToken from './refreshGitlabToken'
import refreshGroupsioToken from './refreshGroupsioToken'
import refreshMaterializedViews from './refreshMaterializedViews'
import syncActivitiesJob from './syncActivities'

const jobs: CrowdJob[] = [
integrationTicks,
Expand All @@ -18,6 +19,7 @@ const jobs: CrowdJob[] = [
refreshGitlabToken,
refreshGithubRepoSettings,
autoImportGroups,
syncActivitiesJob,
]

export default jobs
141 changes: 141 additions & 0 deletions backend/src/bin/jobs/syncActivities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import cronGenerator from 'cron-time-generator'

import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database'
import { IDbActivityCreateData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data'
import ActivityRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo'
import { QueryExecutor, formatQuery, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { Logger, logExecutionTimeV2, timer } from '@crowd/logging'
import { getClientSQL } from '@crowd/questdb'
import { PlatformType } from '@crowd/types'

import { DB_CONFIG } from '@/conf'

import { CrowdJob } from '../../types/jobTypes'

async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise<string> {
if (!maxUpdatedAt) {
const result = await pgQx.selectOne('SELECT MAX("updatedAt") AS "maxUpdatedAt" FROM activities')
return result?.maxUpdatedAt
}

return maxUpdatedAt
}

async function getTotalActivities(qdbQx: QueryExecutor, whereClause: string): Promise<number> {
const { totalActivities } = await qdbQx.selectOne(
`SELECT COUNT(1) AS "totalActivities" FROM activities WHERE ${whereClause}`,
)
return totalActivities
}

function createWhereClause(updatedAt: string): string {
return formatQuery('"updatedAt" > $(updatedAt)', { updatedAt })
}

async function syncActivitiesBatch(
activityRepo: ActivityRepository,
activities: IDbActivityCreateData[],
) {
const result = {
inserted: 0,
updated: 0,
}

for (const activity of activities) {
const existingActivity = await activityRepo.existsWithId(activity.id)

if (existingActivity) {
await activityRepo.rawUpdate(activity.id, {
...activity,
platform: activity.platform as PlatformType,
})
result.updated++
} else {
await activityRepo.rawInsert(activity)
result.inserted++
}
}

return result
}

export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
logger.info(`Syncing activities from ${maxUpdatedAt}`)

const qdb = await getClientSQL()
const db = await getDbConnection({
host: DB_CONFIG.writeHost,
port: DB_CONFIG.port,
database: DB_CONFIG.database,
user: DB_CONFIG.username,
password: DB_CONFIG.password,
})

const pgQx = pgpQx(db)
const qdbQx = pgpQx(qdb)
const activityRepo = new ActivityRepository(new DbStore(logger, db, undefined, true), logger)

let updatedAt = await logExecutionTimeV2(
() => decideUpdatedAt(pgQx, maxUpdatedAt),
logger,
'decide updatedAt',
)

const whereClause = createWhereClause(updatedAt)

const totalActivities = await logExecutionTimeV2(
() => getTotalActivities(qdbQx, whereClause),
logger,
'get total activities',
)

let counter = 0

const t = timer(logger, `sync ${totalActivities} activities`)
// eslint-disable-next-line no-constant-condition
while (true) {
const result = await logExecutionTimeV2(
// eslint-disable-next-line @typescript-eslint/no-loop-func
() =>
qdbQx.select(
`
SELECT *
FROM activities
WHERE "updatedAt" > $(updatedAt)
ORDER BY "updatedAt"
LIMIT 1000;
`,
{ updatedAt },
),
logger,
`getting activities with updatedAt > ${updatedAt}`,
)

if (result.length === 0) {
break
}

const t = timer(logger)
const { inserted, updated } = await syncActivitiesBatch(activityRepo, result)
t.end(`Inserting ${inserted} and updating ${updated} activities`)

counter += inserted + updated
const pct = Math.round((counter / totalActivities) * 100)
logger.info(`synced ${counter} activities out of ${totalActivities}. That's ${pct}%`)

updatedAt = result[result.length - 1].updatedAt
}

t.end()
}

const job: CrowdJob = {
name: 'Sync Activities',
// every day
cronTime: cronGenerator.every(1).days(),
onTrigger: async (logger: Logger) => {
await syncActivities(logger)
},
}

export default job
18 changes: 18 additions & 0 deletions backend/src/bin/scripts/syncActivities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { getServiceChildLogger } from '@crowd/logging'

import { syncActivities } from '../jobs/syncActivities'

const logger = getServiceChildLogger('syncActivities')

setImmediate(async () => {
const updatedAt = process.argv[2]

if (!updatedAt) {
logger.error('No updatedAt provided')
process.exit(1)
}

await syncActivities(logger, updatedAt)

process.exit(0)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS activities_updated_at ON activities ("updatedAt");
2 changes: 1 addition & 1 deletion backend/src/services/activityService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ export default class ActivityService extends LoggerBase {
)

record = await ActivityRepository.create(data, repositoryOptions)
await insertActivities([{ ...data, id: record.id }])
await insertActivities([{ ...data, id: record.id }], true)

// Only track activity's platform and timestamp and memberId. It is completely annonymous.
telemetryTrack(
Expand Down
Loading