From 7025b4ddd75814c7191f402549f7474cf944a4e2 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Mon, 28 Oct 2024 15:20:42 +0000 Subject: [PATCH 01/11] Format file --- .../src/conversations/sql.ts | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/services/libs/data-access-layer/src/conversations/sql.ts b/services/libs/data-access-layer/src/conversations/sql.ts index 79580e4bb1..f5aa27dde5 100644 --- a/services/libs/data-access-layer/src/conversations/sql.ts +++ b/services/libs/data-access-layer/src/conversations/sql.ts @@ -46,10 +46,10 @@ export async function getConversationById( "createdById", "updatedById" from conversations - where - id = $(id) and - "tenantId" = $(tenantId) and - "segmentId" in ($(segmentIds:csv)) and + where + id = $(id) and + "tenantId" = $(tenantId) and + "segmentId" in ($(segmentIds:csv)) and "deletedAt" is null `, { @@ -339,12 +339,12 @@ export async function doesConversationWithSlugExists( ): Promise { const results = await conn.any( ` - select id - from conversations - where - "tenantId" = $(tenantId) and - "segmentId" = $(segmentId) and - slug = $(slug) and + select id + from conversations + where + "tenantId" = $(tenantId) and + "segmentId" = $(segmentId) and + slug = $(slug) and "deletedAt" is null `, { @@ -470,9 +470,9 @@ export async function queryConversations( group by "conversationId" ) select - from conversations c + from conversations c inner join activity_data a on a."conversationId" = c.id - where c."deletedAt" is null and + where c."deletedAt" is null and c."tenantId" = $(tenantId) and c."segmentId" in ($(segmentIds:csv)) and ${filterString} @@ -496,9 +496,9 @@ export async function queryConversations( group by "conversationId" ) select - from conversations c + from conversations c inner join activity_data a on a."conversationId" = c.id - where c."deletedAt" is null and + where c."deletedAt" is null and c."tenantId" = $(tenantId) and c."segmentId" in ($(segmentIds:csv)) and ${filterString} @@ -520,7 +520,7 @@ export async function queryConversations( let query = `${baseQuery.replace( '', ` - c.id, + c.id, a.channel, c."createdAt", a."memberCount", From 437d3ab8ed75413fe3f045de19d7c404d9660c66 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Mon, 28 Oct 2024 15:21:04 +0000 Subject: [PATCH 02/11] Ignore empty options in orderBy of conversations/query --- services/libs/data-access-layer/src/conversations/sql.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/libs/data-access-layer/src/conversations/sql.ts b/services/libs/data-access-layer/src/conversations/sql.ts index f5aa27dde5..013a6c0461 100644 --- a/services/libs/data-access-layer/src/conversations/sql.ts +++ b/services/libs/data-access-layer/src/conversations/sql.ts @@ -391,6 +391,10 @@ export async function queryConversations( const parsedOrderBys = [] for (const orderByPart of arg.orderBy) { + if (orderByPart.trim().length === 0) { + continue + } + const orderByParts = orderByPart.split('_') const direction = orderByParts[1].toLowerCase() switch (orderByParts[0]) { From 1c2262b39b5f7b3f9ee01a5cb54478b74a9c120b Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Tue, 29 Oct 2024 10:24:52 +0000 Subject: [PATCH 03/11] Remove unused file --- .../apps/search_sync_worker/activity.repo.ts | 36 ------------------- 1 file changed, 36 deletions(-) delete mode 100644 services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts diff --git a/services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts b/services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts deleted file mode 100644 index 5465a608d0..0000000000 --- a/services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { DbStore, RepositoryBase } from '@crowd/database' -import { Logger } from '@crowd/logging' - -import { IDbActivityId } from './activity.data' - -export class ActivityRepository extends RepositoryBase { - constructor(dbStore: DbStore, parentLog: Logger) { - super(dbStore, parentLog) - } - - public async checkActivitiesExist(activityIds: string[]): Promise { - const results = await this.db().any( - ` - select id - from activities where id in ($(activityIds:csv)) and "deletedAt" is null - `, - { - activityIds, - }, - ) - - return results - } - - public async getTenantIds(): Promise { - const results = await this.db().any( - `select "tenantId" - from activities - where "deletedAt" is null - group by "tenantId" - order by count(id) asc`, - ) - - return results.map((r) => r.tenantId) - } -} From cd8f349086c9fc17b637a098496408376783b973 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Mon, 4 Nov 2024 07:30:59 +0000 Subject: [PATCH 04/11] Do not write/use activities from postgres from data-sink-worker --- .../src/service/activity.data.ts | 1 + .../src/service/activity.service.ts | 105 ++++-------------- .../src/activities/update.ts | 34 ++++-- .../data_sink_worker/repo/activity.data.ts | 2 + 4 files changed, 49 insertions(+), 93 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.data.ts b/services/apps/data_sink_worker/src/service/activity.data.ts index d4a8f73faa..0f802b1245 100644 --- a/services/apps/data_sink_worker/src/service/activity.data.ts +++ b/services/apps/data_sink_worker/src/service/activity.data.ts @@ -1,6 +1,7 @@ import { PlatformType } from '@crowd/types' export interface IActivityCreateData { + id: string type: string isContribution: boolean score: number diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 0662b1545f..56c3c142de 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1,15 +1,20 @@ import isEqual from 'lodash.isequal' import mergeWith from 'lodash.mergewith' -import { EDITION, escapeNullByte, isObjectEmpty, singleOrDefault } from '@crowd/common' +import { + EDITION, + escapeNullByte, + generateUUIDv4, + isObjectEmpty, + singleOrDefault, +} from '@crowd/common' import { SearchSyncWorkerEmitter } from '@crowd/common_services' -import { insertActivities } from '@crowd/data-access-layer' +import { insertActivities, queryActivities } from '@crowd/data-access-layer' import { DbStore, arePrimitivesDbEqual } from '@crowd/data-access-layer/src/database' import { IDbActivity, IDbActivityUpdateData, } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data' -import ActivityRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo' import GithubReposRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/githubRepos.repo' import GitlabReposRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/gitlabRepos.repo' import IntegrationRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/integration.repo' @@ -66,7 +71,6 @@ export default class ActivityService extends LoggerBase { }) const id = await this.pgStore.transactionally(async (txStore) => { - const txRepo = new ActivityRepository(txStore, this.log) const txSettingsRepo = new SettingsRepository(txStore, this.log) await txSettingsRepo.createActivityType( @@ -85,33 +89,11 @@ export default class ActivityService extends LoggerBase { ) } - const id = await txRepo.create(tenantId, segmentId, { - type: activity.type, - timestamp: activity.timestamp.toISOString(), - platform: activity.platform, - isContribution: activity.isContribution, - score: activity.score, - sourceId: activity.sourceId, - sourceParentId: activity.sourceParentId, - tenantId, - memberId: activity.memberId, - username: activity.username, - sentiment, - attributes: activity.attributes || {}, - body: escapeNullByte(activity.body), - title: escapeNullByte(activity.title), - channel: activity.channel, - url: activity.url, - organizationId: activity.organizationId, - objectMemberId: activity.objectMemberId, - objectMemberUsername: activity.objectMemberUsername, - }) - this.log.debug('Creating an activity in QuestDB!') try { await insertActivities([ { - id, + id: activity.id, timestamp: activity.timestamp.toISOString(), platform: activity.platform, type: activity.type, @@ -197,7 +179,6 @@ export default class ActivityService extends LoggerBase { try { let toUpdate: IDbActivityUpdateData const updated = await this.pgStore.transactionally(async (txStore) => { - const txRepo = new ActivityRepository(txStore, this.log) const txSettingsRepo = new SettingsRepository(txStore, this.log) toUpdate = await this.mergeActivityData(activity, original) @@ -222,25 +203,6 @@ export default class ActivityService extends LoggerBase { if (!isObjectEmpty(toUpdate)) { this.log.debug({ activityId: id }, 'Updating activity.') - await txRepo.update(id, tenantId, segmentId, { - tenantId: tenantId, - segmentId: segmentId, - type: toUpdate.type || original.type, - isContribution: toUpdate.isContribution || original.isContribution, - score: toUpdate.score || original.score, - sourceId: toUpdate.sourceId || original.sourceId, - sourceParentId: toUpdate.sourceParentId || original.sourceParentId, - memberId: toUpdate.memberId || original.memberId, - username: toUpdate.username || original.username, - sentiment: toUpdate.sentiment || original.sentiment, - attributes: toUpdate.attributes || original.attributes, - body: escapeNullByte(toUpdate.body || original.body), - title: escapeNullByte(toUpdate.title || original.title), - channel: toUpdate.channel || original.channel, - url: toUpdate.url || original.url, - organizationId: toUpdate.organizationId || original.organizationId, - platform: toUpdate.platform || (original.platform as PlatformType), - }) // use insert instead of update to avoid using pg protocol with questdb try { @@ -277,28 +239,6 @@ export default class ActivityService extends LoggerBase { throw error } - // await updateActivity(this.qdbStore.connection(), id, { - // tenantId: tenantId, - // segmentId: segmentId, - // type: toUpdate.type || original.type, - // isContribution: toUpdate.isContribution || original.isContribution, - // score: toUpdate.score || original.score, - // sourceId: toUpdate.sourceId || original.sourceId, - // sourceParentId: toUpdate.sourceParentId || original.sourceParentId, - // memberId: toUpdate.memberId || original.memberId, - // username: toUpdate.username || original.username, - // sentiment: toUpdate.sentiment || original.sentiment, - // attributes: toUpdate.attributes || original.attributes, - // body: escapeNullByte(toUpdate.body || original.body), - // title: escapeNullByte(toUpdate.title || original.title), - // channel: toUpdate.channel || original.channel, - // url: toUpdate.url || original.url, - // organizationId: toUpdate.organizationId || original.organizationId, - // platform: toUpdate.platform || (original.platform as PlatformType), - // isBotActivity: memberInfo.isBot, - // isTeamMemberActivity: memberInfo.isTeamMember, - // }) - return true } else { this.log.debug({ activityId: id }, 'No changes to update in an activity.') @@ -661,7 +601,6 @@ export default class ActivityService extends LoggerBase { await this.pgStore.transactionally(async (txStore) => { try { - const txRepo = new ActivityRepository(txStore, this.log) const txMemberRepo = new MemberRepository(txStore, this.log) const txMemberService = new MemberService( txStore, @@ -705,14 +644,21 @@ export default class ActivityService extends LoggerBase { } // find existing activity - const dbActivity = await txRepo.findExisting( + const { + rows: [dbActivity], + } = await queryActivities(this.qdbStore.connection(), { tenantId, - segmentId, - activity.sourceId, - platform, - activity.type, - activity.channel, - ) + segmentIds: [segmentId], + filter: { + and: [ + { sourceId: { eq: activity.sourceId } }, + { platform: { eq: platform } }, + { type: { eq: activity.type } }, + { channel: { eq: activity.channel } }, + ], + }, + limit: 1, + }) if (dbActivity && dbActivity?.deletedAt) { // we found an existing activity but it's deleted - nothing to do here @@ -753,8 +699,6 @@ export default class ActivityService extends LoggerBase { 'Exiting activity has a memberId that does not match the memberId for the platform:username identity! Deleting the activity!', ) - // delete activity - await txRepo.delete(dbActivity.id) createActivity = true } @@ -869,8 +813,6 @@ export default class ActivityService extends LoggerBase { 'Exiting activity has a objectMemberId that does not match the object member for the platform:username identity! Deleting the activity!', ) - // delete activity - await txRepo.delete(dbActivity.id) createActivity = true } @@ -1139,6 +1081,7 @@ export default class ActivityService extends LoggerBase { tenantId, segmentId, { + id: dbActivity?.id ?? generateUUIDv4(), type: activity.type, platform, timestamp: new Date(activity.timestamp), diff --git a/services/libs/data-access-layer/src/activities/update.ts b/services/libs/data-access-layer/src/activities/update.ts index 773d36b17d..2d4fb05a1e 100644 --- a/services/libs/data-access-layer/src/activities/update.ts +++ b/services/libs/data-access-layer/src/activities/update.ts @@ -10,20 +10,14 @@ import { insertActivities } from './ilp' const logger = getServiceChildLogger('activities.update') -export async function updateActivities( +export async function streamActivities( qdb: DbConnOrTx, - mapActivity: (activity: IDbActivityCreateData) => Promise>, + onActivity: (activity: IDbActivityCreateData) => Promise, where: string, params?: Record, ): Promise<{ processed: number; duration: number }> { const whereClause = formatQuery(where, params) - const qs = new QueryStream( - ` - SELECT * - FROM activities - WHERE ${whereClause} - `, - ) + const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`) const t = timer(logger, `query activities with ${whereClause}`) const res = await qdb.stream(qs, async (stream) => { @@ -32,13 +26,29 @@ export async function updateActivities( const activity = item as unknown as IDbActivityCreateData + await onActivity(activity) + } + }) + return res +} + +export async function updateActivities( + qdb: DbConnOrTx, + mapActivity: (activity: IDbActivityCreateData) => Promise>, + where: string, + params?: Record, +): Promise<{ processed: number; duration: number }> { + return streamActivities( + qdb, + async (activity) => { await insertActivities([ { ...activity, ...(await mapActivity(activity)), }, ]) - } - }) - return res + }, + where, + params, + ) } diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts index 38d7e22017..b296389389 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts @@ -128,6 +128,8 @@ export interface IDbActivityUpdateData { isTeamMemberActivity?: boolean isBotActivity?: boolean updatedById?: string + updatedAt?: string + createdAt?: string } let updateActivityColumnSet: DbColumnSet From d18faf73bedad2e81eac5216096a4fb27c93a488 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Mon, 4 Nov 2024 07:32:32 +0000 Subject: [PATCH 05/11] Sync activities from questdb to postgres periodically --- backend/package.json | 3 +- backend/src/bin/jobs/index.ts | 2 + backend/src/bin/jobs/syncActivities.ts | 141 ++++++++++++++++++ backend/src/bin/scripts/syncActivities.ts | 18 +++ pnpm-lock.yaml | 108 +++++++------- .../data_sink_worker/repo/activity.repo.ts | 21 +++ services/libs/logging/src/utility.ts | 6 +- 7 files changed, 243 insertions(+), 56 deletions(-) create mode 100644 backend/src/bin/jobs/syncActivities.ts create mode 100644 backend/src/bin/scripts/syncActivities.ts diff --git a/backend/package.json b/backend/package.json index 2f2bd276f4..6c7e2be1b2 100644 --- a/backend/package.json +++ b/backend/package.json @@ -41,7 +41,8 @@ "script:cache-dashboard": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/cache-dashboard.ts", "script:purge-tenants-and-data": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/purge-tenants-and-data.ts", "script:import-lfx-memberships": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/import-lfx-memberships.ts", - "script:fix-missing-org-displayName": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-missing-org-displayName.ts" + "script:fix-missing-org-displayName": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-missing-org-displayName.ts", + "script:syncActivities": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/syncActivities.ts" }, "dependencies": { "@aws-sdk/client-comprehend": "^3.159.0", diff --git a/backend/src/bin/jobs/index.ts b/backend/src/bin/jobs/index.ts index 04472c6538..f984ee7e38 100644 --- a/backend/src/bin/jobs/index.ts +++ b/backend/src/bin/jobs/index.ts @@ -8,6 +8,7 @@ import refreshGithubRepoSettings from './refreshGithubRepoSettings' import refreshGitlabToken from './refreshGitlabToken' import refreshGroupsioToken from './refreshGroupsioToken' import refreshMaterializedViews from './refreshMaterializedViews' +import syncActivitiesJob from './syncActivities' const jobs: CrowdJob[] = [ integrationTicks, @@ -18,6 +19,7 @@ const jobs: CrowdJob[] = [ refreshGitlabToken, refreshGithubRepoSettings, autoImportGroups, + syncActivitiesJob, ] export default jobs diff --git a/backend/src/bin/jobs/syncActivities.ts b/backend/src/bin/jobs/syncActivities.ts new file mode 100644 index 0000000000..5708b0bc5a --- /dev/null +++ b/backend/src/bin/jobs/syncActivities.ts @@ -0,0 +1,141 @@ +import cronGenerator from 'cron-time-generator' + +import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' +import { IDbActivityCreateData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data' +import ActivityRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo' +import { QueryExecutor, formatQuery, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { Logger, logExecutionTimeV2, timer } from '@crowd/logging' +import { getClientSQL } from '@crowd/questdb' +import { PlatformType } from '@crowd/types' + +import { DB_CONFIG } from '@/conf' + +import { CrowdJob } from '../../types/jobTypes' + +async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise { + if (!maxUpdatedAt) { + const result = await pgQx.selectOne('SELECT MAX("updatedAt") AS "maxUpdatedAt" FROM activities') + return result?.maxUpdatedAt + } + + return maxUpdatedAt +} + +async function getTotalActivities(qdbQx: QueryExecutor, whereClause: string): Promise { + const { totalActivities } = await qdbQx.selectOne( + `SELECT COUNT(1) AS "totalActivities" FROM activities WHERE ${whereClause}`, + ) + return totalActivities +} + +function createWhereClause(updatedAt: string): string { + return formatQuery('"updatedAt" > $(updatedAt)', { updatedAt }) +} + +async function syncActivitiesBatch( + activityRepo: ActivityRepository, + activities: IDbActivityCreateData[], +) { + const result = { + inserted: 0, + updated: 0, + } + + for (const activity of activities) { + const existingActivity = await activityRepo.existsWithId(activity.id) + + if (existingActivity) { + await activityRepo.rawUpdate(activity.id, { + ...activity, + platform: activity.platform as PlatformType, + }) + result.updated++ + } else { + await activityRepo.rawInsert(activity) + result.inserted++ + } + } + + return result +} + +export async function syncActivities(logger: Logger, maxUpdatedAt?: string) { + logger.info(`Syncing activities from ${maxUpdatedAt}`) + + const qdb = await getClientSQL() + const db = await getDbConnection({ + host: DB_CONFIG.writeHost, + port: DB_CONFIG.port, + database: DB_CONFIG.database, + user: DB_CONFIG.username, + password: DB_CONFIG.password, + }) + + const pgQx = pgpQx(db) + const qdbQx = pgpQx(qdb) + const activityRepo = new ActivityRepository(new DbStore(logger, db, undefined, true), logger) + + let updatedAt = await logExecutionTimeV2( + () => decideUpdatedAt(pgQx, maxUpdatedAt), + logger, + 'decide updatedAt', + ) + + const whereClause = createWhereClause(updatedAt) + + const totalActivities = await logExecutionTimeV2( + () => getTotalActivities(qdbQx, whereClause), + logger, + 'get total activities', + ) + + let counter = 0 + + const t = timer(logger, `sync ${totalActivities} activities`) + // eslint-disable-next-line no-constant-condition + while (true) { + const result = await logExecutionTimeV2( + // eslint-disable-next-line @typescript-eslint/no-loop-func + () => + qdbQx.select( + ` + SELECT * + FROM activities + WHERE "updatedAt" > $(updatedAt) + ORDER BY "updatedAt" + LIMIT 1000; + `, + { updatedAt }, + ), + logger, + `getting activities with updatedAt > ${updatedAt}`, + ) + + if (result.length === 0) { + break + } + + const t = timer(logger) + const { inserted, updated } = await syncActivitiesBatch(activityRepo, result) + t.end(`Inserting ${inserted} and updating ${updated} activities`) + + counter += inserted + updated + const pct = Math.round((counter / totalActivities) * 100) + logger.info(`synced ${counter} activities out of ${totalActivities}. That's ${pct}%`) + + updatedAt = result[result.length - 1].updatedAt + } + + t.end() +} + +const job: CrowdJob = { + name: 'Sync Activities', + // every day + cronTime: cronGenerator.every(1).days(), + onTrigger: async (logger: Logger) => { + await syncActivities(logger) + }, +} + +export default job diff --git a/backend/src/bin/scripts/syncActivities.ts b/backend/src/bin/scripts/syncActivities.ts new file mode 100644 index 0000000000..1c3dc2dc79 --- /dev/null +++ b/backend/src/bin/scripts/syncActivities.ts @@ -0,0 +1,18 @@ +import { getServiceChildLogger } from '@crowd/logging' + +import { syncActivities } from '../jobs/syncActivities' + +const logger = getServiceChildLogger('syncActivities') + +setImmediate(async () => { + const updatedAt = process.argv[2] + + if (!updatedAt) { + logger.error('No updatedAt provided') + process.exit(1) + } + + await syncActivities(logger, updatedAt) + + process.exit(0) +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index da8e3314ae..010f0cf60f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -9990,8 +9990,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0 - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -10171,11 +10171,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.572.0': + '@aws-sdk/client-sso-oidc@3.572.0(@aws-sdk/client-sts@3.572.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -10214,6 +10214,7 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: + - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso@3.556.0': @@ -10346,11 +10347,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': + '@aws-sdk/client-sts@3.572.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -10389,7 +10390,6 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: - - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/core@3.556.0': @@ -10469,7 +10469,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/credential-provider-env': 3.568.0 '@aws-sdk/credential-provider-process': 3.572.0 '@aws-sdk/credential-provider-sso': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) @@ -10575,7 +10575,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.568.0(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/types': 2.12.0 @@ -10720,7 +10720,7 @@ snapshots: '@aws-sdk/token-providers@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/shared-ini-file-loader': 2.4.0 @@ -10823,7 +10823,7 @@ snapshots: '@babel/traverse': 7.24.1 '@babel/types': 7.24.0 convert-source-map: 2.0.0 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.1 @@ -10884,7 +10884,7 @@ snapshots: '@babel/core': 7.24.4 '@babel/helper-compilation-targets': 7.23.6 '@babel/helper-plugin-utils': 7.24.0 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 lodash.debounce: 4.0.8 resolve: 1.22.8 transitivePeerDependencies: @@ -11539,7 +11539,7 @@ snapshots: '@babel/helper-split-export-declaration': 7.22.6 '@babel/parser': 7.24.4 '@babel/types': 7.24.0 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -11554,7 +11554,7 @@ snapshots: '@babel/helper-split-export-declaration': 7.22.6 '@babel/parser': 7.24.4 '@babel/types': 7.24.0 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -11730,7 +11730,7 @@ snapshots: '@eslint/eslintrc@2.1.4': dependencies: ajv: 6.12.6 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 espree: 9.6.1 globals: 13.24.0 ignore: 5.3.1 @@ -11844,7 +11844,7 @@ snapshots: '@humanwhocodes/config-array@0.11.14': dependencies: '@humanwhocodes/object-schema': 2.0.3 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 minimatch: 3.1.2 transitivePeerDependencies: - supports-color @@ -12362,7 +12362,7 @@ snapshots: '@opensearch-project/opensearch@2.11.0': dependencies: aws4: 1.12.0 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 hpagent: 1.2.0 json11: 1.1.2 ms: 2.1.3 @@ -12691,7 +12691,7 @@ snapshots: async: 3.2.5 chalk: 3.0.0 dayjs: 1.8.36 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 eventemitter2: 5.0.1 fast-json-patch: 3.1.1 fclone: 1.0.11 @@ -12711,7 +12711,7 @@ snapshots: '@opencensus/core': 0.0.9 '@opencensus/propagation-b3': 0.0.8 async: 2.6.4 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 eventemitter2: 6.4.9 require-in-the-middle: 5.2.0 semver: 7.5.4 @@ -12724,7 +12724,7 @@ snapshots: '@pm2/js-api@0.8.0(bufferutil@4.0.8)(utf-8-validate@5.0.10)': dependencies: async: 2.6.4 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 eventemitter2: 6.4.9 extrareqp2: 1.0.0(debug@4.3.4) ws: 7.5.9(bufferutil@4.0.8)(utf-8-validate@5.0.10) @@ -12735,7 +12735,7 @@ snapshots: '@pm2/pm2-version-check@1.0.4': dependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 transitivePeerDependencies: - supports-color @@ -13265,7 +13265,7 @@ snapshots: '@superfaceai/parser': 1.2.0 abort-controller: 3.0.0 cross-fetch: 3.1.8(encoding@0.1.13) - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 isomorphic-form-data: 2.0.0 vm2: 3.9.19 transitivePeerDependencies: @@ -13276,7 +13276,7 @@ snapshots: dependencies: '@superfaceai/ast': 1.2.0 '@types/debug': 4.1.12 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 typescript: 4.9.5 transitivePeerDependencies: - supports-color @@ -13671,7 +13671,7 @@ snapshots: '@typescript-eslint/scope-manager': 5.62.0 '@typescript-eslint/type-utils': 5.62.0(eslint@8.57.0)(typescript@5.6.3) '@typescript-eslint/utils': 5.62.0(eslint@8.57.0)(typescript@5.6.3) - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 eslint: 8.57.0 graphemer: 1.4.0 ignore: 5.3.1 @@ -13708,7 +13708,7 @@ snapshots: '@typescript-eslint/scope-manager': 5.62.0 '@typescript-eslint/types': 5.62.0 '@typescript-eslint/typescript-estree': 5.62.0(typescript@5.6.3) - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 eslint: 8.57.0 optionalDependencies: typescript: 5.6.3 @@ -13742,7 +13742,7 @@ snapshots: dependencies: '@typescript-eslint/typescript-estree': 5.62.0(typescript@5.6.3) '@typescript-eslint/utils': 5.62.0(eslint@8.57.0)(typescript@5.6.3) - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 eslint: 8.57.0 tsutils: 3.21.0(typescript@5.6.3) optionalDependencies: @@ -13770,7 +13770,7 @@ snapshots: dependencies: '@typescript-eslint/types': 5.62.0 '@typescript-eslint/visitor-keys': 5.62.0 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 globby: 11.1.0 is-glob: 4.0.3 semver: 7.6.0 @@ -13951,13 +13951,13 @@ snapshots: agent-base@6.0.2: dependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 transitivePeerDependencies: - supports-color agent-base@7.1.1: dependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 transitivePeerDependencies: - supports-color @@ -14939,6 +14939,10 @@ snapshots: optionalDependencies: supports-color: 5.5.0 + debug@4.3.4: + dependencies: + ms: 2.1.2 + debug@4.3.4(supports-color@5.5.0): dependencies: ms: 2.1.2 @@ -15201,7 +15205,7 @@ snapshots: base64id: 2.0.0 cookie: 0.4.2 cors: 2.8.5 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 engine.io-parser: 5.2.2 ws: 8.11.0(bufferutil@4.0.8)(utf-8-validate@5.0.10) transitivePeerDependencies: @@ -15514,7 +15518,7 @@ snapshots: ajv: 6.12.6 chalk: 4.1.2 cross-spawn: 7.0.3 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 doctrine: 3.0.0 escape-string-regexp: 4.0.0 eslint-scope: 7.2.2 @@ -15825,7 +15829,7 @@ snapshots: follow-redirects@1.15.6(debug@4.3.4): optionalDependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 for-each@0.3.3: dependencies: @@ -16002,7 +16006,7 @@ snapshots: dependencies: basic-ftp: 5.0.5 data-uri-to-buffer: 6.0.2 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 fs-extra: 11.2.0 transitivePeerDependencies: - supports-color @@ -16267,7 +16271,7 @@ snapshots: http-proxy-agent@7.0.2: dependencies: agent-base: 7.1.1 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 transitivePeerDependencies: - supports-color @@ -16276,14 +16280,14 @@ snapshots: https-proxy-agent@5.0.1: dependencies: agent-base: 6.0.2 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 transitivePeerDependencies: - supports-color https-proxy-agent@7.0.4: dependencies: agent-base: 7.1.1 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 transitivePeerDependencies: - supports-color @@ -17092,7 +17096,7 @@ snapshots: dependencies: '@types/express': 4.17.21 '@types/jsonwebtoken': 9.0.6 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 jose: 4.15.5 limiter: 1.1.5 lru-memoizer: 2.2.0 @@ -17923,7 +17927,7 @@ snapshots: dependencies: '@tootallnate/quickjs-emscripten': 0.23.0 agent-base: 7.1.1 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 get-uri: 6.0.3 http-proxy-agent: 7.0.2 https-proxy-agent: 7.0.4 @@ -18164,7 +18168,7 @@ snapshots: pm2-axon-rpc@0.7.1: dependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 transitivePeerDependencies: - supports-color @@ -18172,7 +18176,7 @@ snapshots: dependencies: amp: 0.3.1 amp-message: 0.1.2 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 escape-string-regexp: 4.0.0 transitivePeerDependencies: - supports-color @@ -18189,7 +18193,7 @@ snapshots: pm2-sysmonit@1.2.8: dependencies: async: 3.2.5 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 pidusage: 2.0.21 systeminformation: 5.22.7 tx2: 1.0.5 @@ -18211,7 +18215,7 @@ snapshots: commander: 2.15.1 croner: 4.1.97 dayjs: 1.11.11 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 enquirer: 2.3.6 eventemitter2: 5.0.1 fclone: 1.0.11 @@ -18322,7 +18326,7 @@ snapshots: proxy-agent@6.3.1: dependencies: agent-base: 7.1.1 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 http-proxy-agent: 7.0.2 https-proxy-agent: 7.0.4 lru-cache: 7.18.3 @@ -18422,7 +18426,7 @@ snapshots: command-line-usage: 6.1.3 config: 3.3.11 configstore: 5.0.1 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 editor: 1.0.0 enquirer: 2.4.1 form-data: 4.0.0 @@ -18548,7 +18552,7 @@ snapshots: require-in-the-middle@5.2.0: dependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 module-details-from-path: 1.0.3 resolve: 1.22.8 transitivePeerDependencies: @@ -18599,7 +18603,7 @@ snapshots: retry-request@4.2.2: dependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 extend: 3.0.2 transitivePeerDependencies: - supports-color @@ -18759,7 +18763,7 @@ snapshots: dependencies: '@types/debug': 4.1.12 '@types/validator': 13.11.9 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 dottie: 2.0.6 inflection: 1.13.4 lodash: 4.17.21 @@ -18895,7 +18899,7 @@ snapshots: socket.io-adapter@2.5.4(bufferutil@4.0.8)(utf-8-validate@5.0.10): dependencies: - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 ws: 8.11.0(bufferutil@4.0.8)(utf-8-validate@5.0.10) transitivePeerDependencies: - bufferutil @@ -18905,7 +18909,7 @@ snapshots: socket.io-parser@4.2.4: dependencies: '@socket.io/component-emitter': 3.1.2 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 transitivePeerDependencies: - supports-color @@ -18914,7 +18918,7 @@ snapshots: accepts: 1.3.8 base64id: 2.0.0 cors: 2.8.5 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 engine.io: 6.5.4(bufferutil@4.0.8)(utf-8-validate@5.0.10) socket.io-adapter: 2.5.4(bufferutil@4.0.8)(utf-8-validate@5.0.10) socket.io-parser: 4.2.4 @@ -18934,7 +18938,7 @@ snapshots: socks-proxy-agent@8.0.3: dependencies: agent-base: 7.1.1 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.7 socks: 2.8.3 transitivePeerDependencies: - supports-color @@ -19117,7 +19121,7 @@ snapshots: dependencies: component-emitter: 1.3.1 cookiejar: 2.1.4 - debug: 4.3.4(supports-color@5.5.0) + debug: 4.3.4 fast-safe-stringify: 2.1.1 form-data: 4.0.0 formidable: 2.1.2 diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts index a5d66ba987..15fc128e82 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts @@ -188,6 +188,27 @@ export default class ActivityRepository extends RepositoryBase { + const result = await this.db().oneOrNone('select 1 from activities where id = $(id)', { id }) + return result !== null + } + + public async rawUpdate(id: string, data: IDbActivityUpdateData): Promise { + const prepared = RepositoryBase.prepare( + { ...data, updatedAt: new Date() }, + this.updateActivityColumnSet, + ) + const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet) + const condition = this.format('where id = $(id)', { id }) + await this.db().none(`${query} ${condition}`) + } + + public async rawInsert(data: IDbActivityCreateData): Promise { + const prepared = RepositoryBase.prepare(data, this.insertActivityColumnSet) + const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet) + await this.db().none(query) + } + public async update( id: string, tenantId: string, diff --git a/services/libs/logging/src/utility.ts b/services/libs/logging/src/utility.ts index d9bd69f02d..a6dff1b87b 100644 --- a/services/libs/logging/src/utility.ts +++ b/services/libs/logging/src/utility.ts @@ -42,11 +42,11 @@ export const logExecutionTimeV2 = async ( } } -export const timer = (log: Logger, name: string) => { +export const timer = (log: Logger, name?: string) => { const start = performance.now() let isEnded = false return { - end: function () { + end: function (overrideName?: string) { if (isEnded) { return } @@ -55,7 +55,7 @@ export const timer = (log: Logger, name: string) => { const end = performance.now() const duration = end - start const durationInSeconds = duration / 1000 - log.info(`Process ${name} took ${durationInSeconds.toFixed(2)} seconds!`) + log.info(`Process ${overrideName ?? name} took ${durationInSeconds.toFixed(2)} seconds!`) }, } } From 218a934a64157b2937a90a4785f935fa9b0847b2 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Thu, 31 Oct 2024 14:47:41 +0000 Subject: [PATCH 06/11] Create index on activities(updatedAt) So we can quickly figure out what was the latest synced activity --- .../migrations/U1730386050__activities-updated-at-index.sql | 0 .../migrations/V1730386050__activities-updated-at-index.sql | 1 + 2 files changed, 1 insertion(+) create mode 100644 backend/src/database/migrations/U1730386050__activities-updated-at-index.sql create mode 100644 backend/src/database/migrations/V1730386050__activities-updated-at-index.sql diff --git a/backend/src/database/migrations/U1730386050__activities-updated-at-index.sql b/backend/src/database/migrations/U1730386050__activities-updated-at-index.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1730386050__activities-updated-at-index.sql b/backend/src/database/migrations/V1730386050__activities-updated-at-index.sql new file mode 100644 index 0000000000..c8ccafc20d --- /dev/null +++ b/backend/src/database/migrations/V1730386050__activities-updated-at-index.sql @@ -0,0 +1 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS activities_updated_at ON activities ("updatedAt"); From 68d2f8a10b511a61e1773e313d8d9f6fee4d29fa Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Mon, 4 Nov 2024 07:44:21 +0000 Subject: [PATCH 07/11] Make sure we always update `updatedAt` when changing activities The second arg in `insertActivities` forcefully sets `updatedAt` to "now" --- backend/src/services/activityService.ts | 2 +- .../data-access-layer/src/activities/update.ts | 15 +++++++++------ .../src/old/apps/profiles_worker/index.ts | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/backend/src/services/activityService.ts b/backend/src/services/activityService.ts index 10b796d203..e86c656564 100644 --- a/backend/src/services/activityService.ts +++ b/backend/src/services/activityService.ts @@ -181,7 +181,7 @@ export default class ActivityService extends LoggerBase { ) record = await ActivityRepository.create(data, repositoryOptions) - await insertActivities([{ ...data, id: record.id }]) + await insertActivities([{ ...data, id: record.id }], true) // Only track activity's platform and timestamp and memberId. It is completely annonymous. telemetryTrack( diff --git a/services/libs/data-access-layer/src/activities/update.ts b/services/libs/data-access-layer/src/activities/update.ts index 2d4fb05a1e..a4cfc7537b 100644 --- a/services/libs/data-access-layer/src/activities/update.ts +++ b/services/libs/data-access-layer/src/activities/update.ts @@ -41,12 +41,15 @@ export async function updateActivities( return streamActivities( qdb, async (activity) => { - await insertActivities([ - { - ...activity, - ...(await mapActivity(activity)), - }, - ]) + await insertActivities( + [ + { + ...activity, + ...(await mapActivity(activity)), + }, + ], + true, + ) }, where, params, diff --git a/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts b/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts index d64d2d76f9..4be9d9a139 100644 --- a/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts @@ -149,7 +149,7 @@ export async function runMemberAffiliationsUpdate( } activity.organizationId = condition.orgId - await insertActivities([activity]) + await insertActivities([activity], true) return } } From 671b3104804ac28ff89a3ff03488d404e042d150 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Wed, 6 Nov 2024 07:53:28 +0000 Subject: [PATCH 08/11] Clean up modifying code of activities in postgres --- .../data_sink_worker/repo/activity.repo.ts | 49 ------------------- 1 file changed, 49 deletions(-) diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts index 15fc128e82..bab9df445d 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts @@ -1,4 +1,3 @@ -import { generateUUIDv1 } from '@crowd/common' import { DbColumnSet, DbStore, RepositoryBase, eqOrNull } from '@crowd/database' import { Logger } from '@crowd/logging' @@ -166,28 +165,6 @@ export default class ActivityRepository extends RepositoryBase { - this.log.debug('Creating an activity in PostgreSQL!') - - const id = generateUUIDv1() - const ts = new Date() - const prepared = RepositoryBase.prepare( - { ...data, id, tenantId, segmentId, createdAt: ts, updatedAt: ts }, - this.insertActivityColumnSet, - ) - const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet) - - await this.db().none(query) - - await this.updateParentIds(tenantId, segmentId, id, data) - - return id - } - public async existsWithId(id: string): Promise { const result = await this.db().oneOrNone('select 1 from activities where id = $(id)', { id }) return result !== null @@ -208,30 +185,4 @@ export default class ActivityRepository extends RepositoryBase { - const prepared = RepositoryBase.prepare( - { ...data, updatedAt: new Date() }, - this.updateActivityColumnSet, - ) - const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet) - const condition = this.format( - 'where id = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)', - { - id, - tenantId, - segmentId, - }, - ) - const result = await this.db().result(`${query} ${condition}`) - - this.checkUpdateRowCount(result.rowCount, 1) - - await this.updateParentIds(tenantId, segmentId, id, data) - } } From 657c99b9ee884585c5549449f6a736a1efcee3a4 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Wed, 6 Nov 2024 10:40:01 +0000 Subject: [PATCH 09/11] Use timestamp too when finding already existing activity This speeds up questdb selects --- services/apps/data_sink_worker/src/service/activity.service.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 56c3c142de..2c505e44cc 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -651,6 +651,7 @@ export default class ActivityService extends LoggerBase { segmentIds: [segmentId], filter: { and: [ + { timestamp: { eq: activity.timestamp } }, { sourceId: { eq: activity.sourceId } }, { platform: { eq: platform } }, { type: { eq: activity.type } }, From bef53ef0777745cf033ccd5298c14c512b9c1a89 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Wed, 6 Nov 2024 10:41:04 +0000 Subject: [PATCH 10/11] Clean up unused modifying sql code of activities --- .../data-access-layer/src/activities/sql.ts | 87 ------------------- .../data_sink_worker/repo/activity.repo.ts | 42 --------- 2 files changed, 129 deletions(-) diff --git a/services/libs/data-access-layer/src/activities/sql.ts b/services/libs/data-access-layer/src/activities/sql.ts index b1d1af43c7..f6a58a3bbb 100644 --- a/services/libs/data-access-layer/src/activities/sql.ts +++ b/services/libs/data-access-layer/src/activities/sql.ts @@ -13,7 +13,6 @@ import { ITimeseriesDatapoint, MemberIdentityType, PageData, - PlatformType, } from '@crowd/types' import { IMemberSegmentAggregates } from '../members/types' @@ -68,92 +67,6 @@ export async function getActivitiesById( return data.rows } -export async function insertActivity( - conn: DbConnOrTx, - id: string, - data: IDbActivityCreateData, -): Promise { - const now = new Date() - - const toInsert: any = { - id, - type: data.type, - timestamp: data.timestamp, - platform: data.platform, - isContribution: data.isContribution, - score: data.score, - importHash: data.importHash, - sourceId: data.sourceId, - createdAt: now, - updatedAt: now, - memberId: data.memberId, - parentId: data.parentId, - tenantId: data.tenantId, - createdById: data.createdById, - updatedById: data.updatedById, - sourceParentId: data.sourceParentId, - conversationId: data.conversationId, - attributes: JSON.stringify(data.attributes || {}), - title: data.title, - body: data.body, - channel: data.channel, - url: data.url, - username: data.username, - objectMemberId: data.objectMemberId, - objectMemberUsername: data.objectMemberUsername, - segmentId: data.segmentId, - organizationId: data.organizationId, - - member_isTeamMember: data.isTeamMemberActivity, - member_isBot: data.isBotActivity, - } - - if (data.sentiment) { - toInsert.sentimentLabel = data.sentiment.label - toInsert.sentimentScore = data.sentiment.sentiment - toInsert.sentimentScoreMixed = data.sentiment.mixed - toInsert.sentimentScoreNeutral = data.sentiment.neutral - toInsert.sentimentScoreNegative = data.sentiment.negative - toInsert.sentimentScorePositive = data.sentiment.positive - } - - if ( - (data.attributes && data.platform === PlatformType.GIT) || - data.platform === PlatformType.GITHUB - ) { - if (data.attributes.isMainBranch) { - toInsert.gitIsMainBranch = data.attributes.isMainBranch - } - if (data.attributes.isIndirectFork) { - toInsert.gitIsIndirectFork = data.attributes.isIndirectFork - } - if (data.attributes.additions) { - toInsert.gitInsertions = data.attributes.additions - } - if (data.attributes.deletions) { - toInsert.gitDeletions = data.attributes.deletions - } - } - - const columns = Object.keys(toInsert) - const columnStrings: string[] = [] - const valueStrings: string[] = [] - for (const column of columns) { - columnStrings.push(`"${column}"`) - valueStrings.push(`$(${column})`) - } - - const query = ` - insert into activities (${columnStrings.join(', ')}) - values (${valueStrings.join(', ')}); - ` - - const result = await conn.result(query, toInsert) - console.log('activity insert result', result.rowCount) - checkUpdateRowCount(result.rowCount, 1) - await updateActivityParentIds(conn, id, data) -} - const ACTIVITY_UPDATABLE_COLUMNS: ActivityColumn[] = [ 'type', 'isContribution', diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts index bab9df445d..225fe8fd0f 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts @@ -123,48 +123,6 @@ export default class ActivityRepository extends RepositoryBase { - const promises: Promise[] = [ - this.db().none( - ` - update activities set "parentId" = $(id) - where "tenantId" = $(tenantId) and "sourceParentId" = $(sourceId) - and "segmentId" = $(segmentId) - `, - { - id, - tenantId, - segmentId, - sourceId: data.sourceId, - }, - ), - ] - - if (data.sourceParentId) { - promises.push( - this.db().none( - ` - update activities set "parentId" = (select id from activities where "tenantId" = $(tenantId) and "sourceId" = $(sourceParentId) and "segmentId" = $(segmentId) and "deletedAt" IS NULL limit 1) - where "id" = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId) - `, - { - id, - tenantId, - segmentId, - sourceParentId: data.sourceParentId, - }, - ), - ) - } - - await Promise.all(promises) - } - public async existsWithId(id: string): Promise { const result = await this.db().oneOrNone('select 1 from activities where id = $(id)', { id }) return result !== null From 3a42be8572b45da9d2a38d86c6347e7fafaba244 Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Wed, 6 Nov 2024 10:41:57 +0000 Subject: [PATCH 11/11] Update activities in merging-entity-worker via questdb --- .../src/activities/members.ts | 2 +- .../old/apps/entity_merging_worker/index.ts | 43 ++++++++----------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/services/apps/entity_merging_worker/src/activities/members.ts b/services/apps/entity_merging_worker/src/activities/members.ts index 24a556170f..15086731ea 100644 --- a/services/apps/entity_merging_worker/src/activities/members.ts +++ b/services/apps/entity_merging_worker/src/activities/members.ts @@ -66,7 +66,7 @@ export async function moveActivitiesWithIdentityToAnotherMember( identitiesWithActivity.some((ai) => ai.platform === i.platform && ai.username === i.value), )) { await moveIdentityActivitiesToNewMember( - svc.postgres.writer, + svc.questdbSQL, tenantId, fromId, toId, diff --git a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts index dfee5341dd..ad5fdbccaf 100644 --- a/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts @@ -2,6 +2,8 @@ import { DbConnOrTx, DbStore } from '@crowd/database' import { IActivityIdentity, IMemberIdentity, MergeActionState, MergeActionStep } from '@crowd/types' import { updateActivities } from '../../../activities/update' +import { formatQuery } from '../../../queryExecutor' +import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data' import { ISegmentIds } from './types' @@ -116,44 +118,35 @@ export async function getIdentitiesWithActivity( } export async function moveIdentityActivitiesToNewMember( - db: DbStore, + db: DbConnOrTx, tenantId: string, fromId: string, toId: string, username: string, platform: string, - batchSize = 1000, ) { - let rowsUpdated - - do { - const result = await db.connection().query( + await updateActivities( + db, + async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }), + formatQuery( ` - UPDATE activities - SET "memberId" = $(toId) - WHERE id in ( - select id from activities - where "memberId" = $(fromId) - and "tenantId" = $(tenantId) - and "username" = $(username) - and "platform" = $(platform) - and "deletedAt" is null - limit $(batchSize) - ) - returning id - `, + "memberId" = $(fromId) + and "tenantId" = $(tenantId) + and "username" = $(username) + and "platform" = $(platform) + and "deletedAt" is null + `, { - toId, fromId, tenantId, username, platform, - batchSize, }, - ) - - rowsUpdated = result.length - } while (rowsUpdated === batchSize) + ), + { + memberId: fromId, + }, + ) } export async function findMemberSegments(db: DbStore, memberId: string): Promise {