diff --git a/backend/package.json b/backend/package.json index e5d9a1cb59..105b2e641f 100644 --- a/backend/package.json +++ b/backend/package.json @@ -36,7 +36,7 @@ "script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts" }, "lint-staged": { - "**/*.*": [ + "**/*.ts": [ "eslint", "prettier --write" ] diff --git a/backend/src/database/migrations/U1737705711__activityRelations.sql b/backend/src/database/migrations/U1737705711__activityRelations.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1737705711__activityRelations.sql b/backend/src/database/migrations/V1737705711__activityRelations.sql new file mode 100644 index 0000000000..17fd8cc5b7 --- /dev/null +++ b/backend/src/database/migrations/V1737705711__activityRelations.sql @@ -0,0 +1,55 @@ +create table public."activityRelations" ( + "activityId" uuid not null primary key, + "memberId" uuid not null, + "objectMemberId" uuid null, + "organizationId" uuid null, + "conversationId" uuid null, + "parentId" uuid null, + "segmentId" uuid not null, + "platform" text not null, + "username" text not null, + "objectMemberUsername" text null, + "createdAt" timestamp with time zone default now() not null, + "updatedAt" timestamp with time zone default now() not null, + foreign key ("memberId") references members (id) on delete cascade, + foreign key ("organizationId") references organizations (id) on delete set null, + foreign key ("objectMemberId") references members (id) on delete set null, + foreign key ("conversationId") references conversations (id) on delete set null, + foreign key ("segmentId") references segments (id) on delete cascade, + unique ("activityId", "memberId") +); +create index "ix_activityRelations_memberId" on "activityRelations"("memberId"); +create index "ix_activityRelations_organizationId" on "activityRelations"("organizationId"); +create index "ix_activityRelations_platform_username" on "activityRelations"("platform", "username"); + + +DO +$$ +DECLARE + batch_size INT := 100000; + last_processed_id UUID := '00000000-0000-0000-0000-000000000000'; + total_processed INT := 0; + rows_inserted INT; +BEGIN + LOOP + INSERT INTO "activityRelations" ("activityId", "memberId", "objectMemberId", "organizationId", "conversationId", "parentId", "platform", "username", "objectMemberUsername") + SELECT id, "memberId", "objectMemberId", "organizationId", "conversationId", "parentId", "platform", "username", "objectMemberUsername" + FROM activities + WHERE id > last_processed_id + ORDER BY id + LIMIT batch_size; + + GET DIAGNOSTICS rows_inserted = ROW_COUNT; + + total_processed := total_processed + rows_inserted; + RAISE NOTICE 'Batch processed: % rows. Total processed: % rows.', rows_inserted, total_processed; + + EXIT WHEN rows_inserted = 0; + + SELECT MAX(id) INTO last_processed_id FROM activities WHERE id > last_processed_id; + + END LOOP; + + RAISE NOTICE 'All rows processed. Total rows inserted: %.', total_processed; +END +$$; diff --git a/backend/src/database/repositories/member/memberOrganizationAffiliationOverridesRepository.ts b/backend/src/database/repositories/member/memberOrganizationAffiliationOverridesRepository.ts index e55ca407ec..069ca4c106 100644 --- a/backend/src/database/repositories/member/memberOrganizationAffiliationOverridesRepository.ts +++ b/backend/src/database/repositories/member/memberOrganizationAffiliationOverridesRepository.ts @@ -1,8 +1,12 @@ import { changeOverride as changeMemberOrganizationAffiliationOverride, findOverrides as findMemberOrganizationAffiliationOverrides, + findPrimaryWorkExperiencesOfMember, } from '@crowd/data-access-layer/src/member_organization_affiliation_overrides' -import { IChangeAffiliationOverrideData } from '@crowd/types' +import { + IChangeAffiliationOverrideData, + IMemberOrganizationAffiliationOverride, +} from '@crowd/types' import { IRepositoryOptions } from '../IRepositoryOptions' import SequelizeRepository from '../sequelizeRepository' @@ -17,6 +21,14 @@ class MemberOrganizationAffiliationOverridesRepository { ]) return overrides[0] } + + static async findPrimaryWorkExperiences( + memberId: string, + options: IRepositoryOptions, + ): Promise { + const qx = SequelizeRepository.getQueryExecutor(options) + return findPrimaryWorkExperiencesOfMember(qx, memberId) + } } export default MemberOrganizationAffiliationOverridesRepository diff --git a/backend/src/services/member/memberAffiliationsService.ts b/backend/src/services/member/memberAffiliationsService.ts index 4f08883836..af6b846f38 100644 --- a/backend/src/services/member/memberAffiliationsService.ts +++ b/backend/src/services/member/memberAffiliationsService.ts @@ -1,7 +1,7 @@ /* eslint-disable no-continue */ import { uniq } from 'lodash' -import { groupBy } from '@crowd/common' +import { Error400, dateIntersects, groupBy } from '@crowd/common' import { findMaintainerRoles } from '@crowd/data-access-layer/src/maintainers' import { fetchManySegments } from '@crowd/data-access-layer/src/segments' import { LoggerBase } from '@crowd/logging' @@ -13,6 +13,7 @@ import { import MemberAffiliationsRepository from '@/database/repositories/member/memberAffiliationsRepository' import MemberOrganizationAffiliationOverridesRepository from '@/database/repositories/member/memberOrganizationAffiliationOverridesRepository' +import MemberOrganizationsRepository from '@/database/repositories/member/memberOrganizationsRepository' import SequelizeRepository from '@/database/repositories/sequelizeRepository' import { IServiceOptions } from '../IServiceOptions' @@ -66,6 +67,40 @@ export default class MemberAffiliationsService extends LoggerBase { async changeAffiliationOverride( data: IChangeAffiliationOverrideData, ): Promise { + if (data.isPrimaryWorkExperience) { + // check if any other work experience in intersecting date range was marked as primary + // we don't allow this because "isPrimaryWorkExperience" decides which work exp to pick on date conflicts + const allWorkExperiencesOfMember = ( + await MemberOrganizationsRepository.list(data.memberId, this.options) + ).map((mo) => mo.memberOrganizations) + + const currentlyEditedWorkExperience = allWorkExperiencesOfMember.find( + (w) => w.id === data.memberOrganizationId, + ) + + const primaryWorkExperiencesOfMember = allWorkExperiencesOfMember.filter( + (w) => w.affiliationOverride.isPrimaryWorkExperience, + ) + + if (currentlyEditedWorkExperience.affiliationOverride.isPrimaryWorkExperience === false) { + for (const existingPrimaryWorkExp of primaryWorkExperiencesOfMember) { + if ( + dateIntersects( + existingPrimaryWorkExp.dateStart as string, + existingPrimaryWorkExp.dateEnd as string, + currentlyEditedWorkExperience.dateStart as string, + currentlyEditedWorkExperience.dateEnd as string, + ) + ) { + throw new Error400( + this.options.language, + `Date range conflicts with another primary work experience id = ${existingPrimaryWorkExp.id}`, + ) + } + } + } + } + const override = MemberOrganizationAffiliationOverridesRepository.changeOverride( data, this.options, diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 5b2b6aa0e4..2f7ebd2c92 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -12,6 +12,7 @@ import { } from '@crowd/common' import { SearchSyncWorkerEmitter } from '@crowd/common_services' import { + createOrUpdateRelations, findCommitsForPRSha, findMatchingPullRequestNodeId, insertActivities, @@ -31,6 +32,7 @@ import { IDbMember } from '@crowd/data-access-layer/src/old/apps/data_sink_worke import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo' import RequestedForErasureMemberIdentitiesRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo' import SettingsRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/settings.repo' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' import { DEFAULT_ACTIVITY_TYPE_SETTINGS, GithubActivityType } from '@crowd/integrations' import { GitActivityType } from '@crowd/integrations/src/integrations/git/types' import { Logger, LoggerBase, getChildLogger } from '@crowd/logging' @@ -84,6 +86,7 @@ export default class ActivityService extends LoggerBase { }) const id = await this.pgStore.transactionally(async (txStore) => { + const queryExecutor = dbStoreQx(txStore) const txSettingsRepo = new SettingsRepository(txStore, this.log) await txSettingsRepo.createActivityType( @@ -132,6 +135,16 @@ export default class ActivityService extends LoggerBase { importHash: activity.importHash, }, ]) + await createOrUpdateRelations(queryExecutor, { + activityId: activity.id, + segmentId, + memberId: activity.memberId, + objectMemberId: activity.objectMemberId, + organizationId: activity.organizationId, + platform: activity.platform, + username: activity.username, + objectMemberUsername: activity.objectMemberUsername, + }) } catch (error) { this.log.error('Error creating activity in QuestDB:', error) throw error @@ -160,6 +173,7 @@ export default class ActivityService extends LoggerBase { try { let toUpdate: IDbActivityUpdateData const updated = await this.pgStore.transactionally(async (txStore) => { + const queryExecutor = dbStoreQx(txStore) const txSettingsRepo = new SettingsRepository(txStore, this.log) toUpdate = await this.mergeActivityData(activity, original) @@ -215,6 +229,16 @@ export default class ActivityService extends LoggerBase { importHash: original.importHash, }, ]) + await createOrUpdateRelations(queryExecutor, { + activityId: id, + segmentId, + memberId: toUpdate.memberId || original.memberId, + objectMemberId: toUpdate.objectMemberId || original.objectMemberId, + organizationId: toUpdate.organizationId || original.organizationId, + platform: toUpdate.platform || (original.platform as PlatformType), + username: toUpdate.username || original.username, + objectMemberUsername: toUpdate.objectMemberUsername || original.objectMemberUsername, + }) } catch (error) { this.log.error('Error updating (by inserting) activity in QuestDB:', error) throw error diff --git a/services/apps/entity_merging_worker/src/activities/members.ts b/services/apps/entity_merging_worker/src/activities/members.ts index 3b133ea565..e16fcb1081 100644 --- a/services/apps/entity_merging_worker/src/activities/members.ts +++ b/services/apps/entity_merging_worker/src/activities/members.ts @@ -1,5 +1,9 @@ import { WorkflowIdReusePolicy } from '@temporalio/workflow' +import { + moveActivityRelationsToAnotherMember, + moveActivityRelationsWithIdentityToAnotherMember, +} from '@crowd/data-access-layer' import { cleanupMemberAggregates } from '@crowd/data-access-layer/src/members/segments' import { cleanupMember, @@ -39,6 +43,7 @@ export async function moveActivitiesBetweenMembers( return } await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId) + await moveActivityRelationsToAnotherMember(dbStoreQx(svc.postgres.writer), secondaryId, primaryId) } export async function moveActivitiesWithIdentityToAnotherMember( @@ -74,6 +79,13 @@ export async function moveActivitiesWithIdentityToAnotherMember( identity.value, identity.platform, ) + await moveActivityRelationsWithIdentityToAnotherMember( + dbStoreQx(svc.postgres.writer), + fromId, + toId, + identity.value, + identity.platform, + ) } } diff --git a/services/apps/entity_merging_worker/src/activities/organizations.ts b/services/apps/entity_merging_worker/src/activities/organizations.ts index 1cc7aaea57..e42f7248f0 100644 --- a/services/apps/entity_merging_worker/src/activities/organizations.ts +++ b/services/apps/entity_merging_worker/src/activities/organizations.ts @@ -1,3 +1,4 @@ +import { moveActivityRelationsToAnotherOrganization } from '@crowd/data-access-layer' import { deleteOrganizationById, deleteOrganizationSegments, @@ -30,6 +31,11 @@ export async function moveActivitiesBetweenOrgs( tenantId: string, ): Promise { await moveActivitiesToNewOrg(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId) + await moveActivityRelationsToAnotherOrganization( + dbStoreQx(svc.postgres.writer), + primaryId, + secondaryId, + ) } export async function recalculateActivityAffiliationsOfOrganizationSynchronous( diff --git a/services/libs/common/src/timing.ts b/services/libs/common/src/timing.ts index 407a6bfd49..56ce19413f 100644 --- a/services/libs/common/src/timing.ts +++ b/services/libs/common/src/timing.ts @@ -57,3 +57,25 @@ export const getLongestDateRange = (orgs: T[]) => return sortedByDateRange[0] } + +export const dateIntersects = ( + d1Start?: string | null, + d1End?: string | null, + d2Start?: string | null, + d2End?: string | null, +): boolean => { + // If both periods have no dates at all, we consider they span all time + if ((!d1Start && !d1End) || (!d2Start && !d2End)) { + return true + } + + // Convert strings to timestamps, using fallbacks for missing dates + const start1 = d1Start ? new Date(d1Start).getTime() : -Infinity + const end1 = d1End ? new Date(d1End).getTime() : Infinity + const start2 = d2Start ? new Date(d2Start).getTime() : -Infinity + const end2 = d2End ? new Date(d2End).getTime() : Infinity + + // Periods intersect if one period's start is before other period's end + // and that same period's end is after the other period's start + return start1 <= end2 && end1 >= start2 +} diff --git a/services/libs/data-access-layer/src/activities/sql.ts b/services/libs/data-access-layer/src/activities/sql.ts index 2e9d7bd2d5..4a02e3100c 100644 --- a/services/libs/data-access-layer/src/activities/sql.ts +++ b/services/libs/data-access-layer/src/activities/sql.ts @@ -22,10 +22,13 @@ import { import { IMemberSegmentAggregates } from '../members/types' import { IPlatforms } from '../old/apps/cache_worker/types' import { + IActivityRelationCreateOrUpdateData, + IActivityRelationUpdateById, IDbActivityCreateData, IDbActivityUpdateData, } from '../old/apps/data_sink_worker/repo/activity.data' import { IDbOrganizationAggregateData } from '../organizations' +import { QueryExecutor } from '../queryExecutor' import { checkUpdateRowCount } from '../utils' import { @@ -1439,3 +1442,183 @@ export async function findCommitsForPRSha( return rows.map((r) => r.id) } + +export async function createOrUpdateRelations( + qe: QueryExecutor, + data: IActivityRelationCreateOrUpdateData, +): Promise { + await qe.result( + ` + INSERT INTO "activityRelations" ( + "activityId", + "memberId", + "objectMemberId", + "organizationId", + "conversationId", + "parentId", + "segmentId", + "platform", + "username", + "objectMemberUsername", + "createdAt", + "updatedAt") + VALUES + ( + $(activityId), + $(memberId), + $(objectMemberId), + $(organizationId), + $(conversationId), + $(parentId), + $(segmentId), + $(platform), + $(username), + $(objectMemberUsername), + now(), + now() + ) + ON CONFLICT ("activityId", "memberId") + DO UPDATE + SET + "updatedAt" = EXCLUDED."updatedAt", + "memberId" = EXCLUDED."memberId", + "objectMemberId" = EXCLUDED."objectMemberId", + "organizationId" = EXCLUDED."organizationId", + "platform" = EXCLUDED."platform", + "username" = EXCLUDED."username", + "objectMemberUsername" = EXCLUDED."objectMemberUsername"; + + `, + { + activityId: data.activityId, + memberId: data.memberId, + segmentId: data.segmentId, + objectMemberId: data.objectMemberId ?? null, + organizationId: data.organizationId ?? null, + conversationId: data.conversationId ?? null, + parentId: data.parentId ?? null, + platform: data.platform, + username: data.username, + objectMemberUsername: data.objectMemberUsername ?? null, + }, + ) +} + +export async function updateActivityRelationsById( + qe: QueryExecutor, + data: IActivityRelationUpdateById, +): Promise { + const fields: string[] = [] + + for (const [key, value] of Object.entries(data)) { + if (value !== undefined && key !== 'activityId') { + fields.push(`"${key}" = $(${key})`) + } + } + + if (fields.length === 0) return + + const query = `UPDATE "activityRelations" SET ${fields.join(', ')} WHERE "activityId" = $(activityId)` + + await qe.result(query, data) +} + +export async function moveActivityRelationsToAnotherMember( + qe: QueryExecutor, + fromId: string, + toId: string, + batchSize = 5000, +) { + let rowsUpdated + + do { + const result = await qe.result( + ` + UPDATE "activityRelations" + SET "memberId" = $(toId) + WHERE "activityId" in ( + select "activityId" from "activityRelations" + where "memberId" = $(fromId) + limit $(batchSize) + ) + returning "activityId" + `, + { + toId, + fromId, + batchSize, + }, + ) + + rowsUpdated = result.length + } while (rowsUpdated === batchSize) +} + +export async function moveActivityRelationsWithIdentityToAnotherMember( + qe: QueryExecutor, + fromId: string, + toId: string, + username: string, + platform: string, + batchSize = 5000, +) { + let rowsUpdated + + do { + const result = await qe.result( + ` + UPDATE "activityRelations" + SET "memberId" = $(toId) + WHERE "activityId" in ( + select "activityId" from "activityRelations" + where + "memberId" = $(fromId) and + "username" = $(username) and + "platform" = $(platform) + limit $(batchSize) + ) + returning "activityId" + `, + { + toId, + fromId, + username, + platform, + batchSize, + }, + ) + + rowsUpdated = result.length + } while (rowsUpdated === batchSize) +} + +export async function moveActivityRelationsToAnotherOrganization( + qe: QueryExecutor, + fromId: string, + toId: string, + batchSize = 5000, +) { + let rowsUpdated + + do { + const result = await qe.result( + ` + UPDATE "activityRelations" + SET "organizationId" = $(toId) + WHERE "activityId" in ( + select "activityId" from "activityRelations" + where "organizationId" = $(fromId) + limit $(batchSize) + ) + returning "activityId" + `, + { + toId, + fromId, + batchSize, + }, + ) + + rowsUpdated = result.length + } while (rowsUpdated === batchSize) +} diff --git a/services/libs/data-access-layer/src/member_organization_affiliation_overrides/index.ts b/services/libs/data-access-layer/src/member_organization_affiliation_overrides/index.ts index 7dd755d816..0fd5cb87fc 100644 --- a/services/libs/data-access-layer/src/member_organization_affiliation_overrides/index.ts +++ b/services/libs/data-access-layer/src/member_organization_affiliation_overrides/index.ts @@ -104,3 +104,26 @@ export async function findOverrides( return results } + +export async function findPrimaryWorkExperiencesOfMember( + qx: QueryExecutor, + memberId: string, +): Promise { + const overrides: IMemberOrganizationAffiliationOverride[] = await qx.select( + ` + SELECT + id, + "memberId", + "memberOrganizationId", + coalesce("allowAffiliation", true) as "allowAffiliation", + coalesce("isPrimaryWorkExperience", false) as "isPrimaryWorkExperience" + FROM "memberOrganizationAffiliationOverrides" + WHERE "memberId" = $(memberId) + AND "isPrimaryWorkExperience" = true + `, + { + memberId, + }, + ) + return overrides +} diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts index f2cb8da111..72b369e62e 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts @@ -60,6 +60,32 @@ export interface IDbActivityCreateData { deletedAt?: string } +export interface IActivityRelationCreateOrUpdateData { + activityId: string + memberId: string + objectMemberId?: string + organizationId?: string + conversationId?: string + parentId?: string + segmentId: string + platform: string + username: string + objectMemberUsername?: string +} + +export interface IActivityRelationUpdateById { + activityId: string + memberId?: string + objectMemberId?: string + organizationId?: string + conversationId?: string + parentId?: string + segmentId?: string + platform?: string + username?: string + objectMemberUsername?: string +} + let insertActivityColumnSet: DbColumnSet export const getInsertActivityColumnSet = (instance: DbInstance): DbColumnSet => { if (insertActivityColumnSet) { diff --git a/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts b/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts index f8e8e0341a..ef9d2a1de4 100644 --- a/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts @@ -7,9 +7,9 @@ import { getServiceChildLogger } from '@crowd/logging' import { IQueue } from '@crowd/queue' import { IMemberOrganization } from '@crowd/types' -import { insertActivities } from '../../../activities' +import { insertActivities, updateActivityRelationsById } from '../../../activities' import { findMemberAffiliations } from '../../../member_segment_affiliations' -import { formatQuery, pgpQx } from '../../../queryExecutor' +import { dbStoreQx, formatQuery, pgpQx } from '../../../queryExecutor' import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data' import { IAffiliationsLastCheckedAt, IMemberId } from './types' @@ -232,10 +232,22 @@ export async function runMemberAffiliationsUpdate( memberOrganizations = memberOrganizations.filter( (row) => - row.title && + row.title !== null && + row.title !== undefined && !blacklistedTitles.some((t) => row.title.toLowerCase().includes(t.toLowerCase())), ) + // clean unknown dated work experiences if there is one marked as primary + const primaryUnknownDatedWorkExperience = memberOrganizations.find( + (row) => row.isPrimaryWorkExperience && !row.dateStart && !row.dateEnd, + ) + + if (primaryUnknownDatedWorkExperience) { + memberOrganizations = memberOrganizations.filter( + (row) => row.dateStart || row.id === primaryUnknownDatedWorkExperience.id, + ) + } + const timeline = buildTimeline(memberOrganizations) const orgCases: Condition[] = [ @@ -314,7 +326,7 @@ export async function runMemberAffiliationsUpdate( } async function insertIfMatches(activity: IDbActivityCreateData) { - activity.organizationId = null + activity.organizationId = fallbackOrganizationId || null if (orgCases.length > 0) { for (const condition of orgCases) { @@ -326,6 +338,7 @@ export async function runMemberAffiliationsUpdate( } await insertActivities(queueClient, [activity], true) + return activity } const qs = new QueryStream( @@ -342,10 +355,33 @@ export async function runMemberAffiliationsUpdate( { memberId }, ), ) + const batchSize = 100 + let activityRelationPromises: Promise[] = [] + + const batchProcessActivityRelations = async () => { + await Promise.all(activityRelationPromises) + activityRelationPromises = [] + } + const { processed, duration } = await qDb.stream(qs, async (stream) => { for await (const activity of stream) { - await insertIfMatches(activity as unknown as IDbActivityCreateData) + const activityWithCorrectOrgId = await insertIfMatches( + activity as unknown as IDbActivityCreateData, + ) + activityRelationPromises.push( + updateActivityRelationsById(dbStoreQx(pgDb), { + activityId: activityWithCorrectOrgId.id, + organizationId: activityWithCorrectOrgId.organizationId, + }), + ) + + if (activityRelationPromises.length >= batchSize) { + await batchProcessActivityRelations() + } } + + // process the last batch (if any) + await batchProcessActivityRelations() }) logger.info(`Updated ${processed} activities in ${duration}ms`)