Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managing activity relations in a separate table #2795

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts"
},
"lint-staged": {
"**/*.*": [
"**/*.ts": [
"eslint",
"prettier --write"
]
Expand Down
Empty file.
55 changes: 55 additions & 0 deletions backend/src/database/migrations/V1737705711__activityRelations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
create table public."activityRelations" (
"activityId" uuid not null primary key,
"memberId" uuid not null,
"objectMemberId" uuid null,
"organizationId" uuid null,
"conversationId" uuid null,
"parentId" uuid null,
"segmentId" uuid not null,
"platform" text not null,
"username" text not null,
"objectMemberUsername" text null,
"createdAt" timestamp with time zone default now() not null,
"updatedAt" timestamp with time zone default now() not null,
foreign key ("memberId") references members (id) on delete cascade,
foreign key ("organizationId") references organizations (id) on delete set null,
foreign key ("objectMemberId") references members (id) on delete set null,
foreign key ("conversationId") references conversations (id) on delete set null,
foreign key ("segmentId") references segments (id) on delete cascade,
unique ("activityId", "memberId")
);
create index "ix_activityRelations_memberId" on "activityRelations"("memberId");
create index "ix_activityRelations_organizationId" on "activityRelations"("organizationId");
create index "ix_activityRelations_platform_username" on "activityRelations"("platform", "username");


DO
$$
DECLARE
batch_size INT := 100000;
last_processed_id UUID := '00000000-0000-0000-0000-000000000000';
total_processed INT := 0;
rows_inserted INT;
BEGIN
LOOP
INSERT INTO "activityRelations" ("activityId", "memberId", "objectMemberId", "organizationId", "conversationId", "parentId", "platform", "username", "objectMemberUsername")
SELECT id, "memberId", "objectMemberId", "organizationId", "conversationId", "parentId", "platform", "username", "objectMemberUsername"
FROM activities
WHERE id > last_processed_id
ORDER BY id
LIMIT batch_size;

GET DIAGNOSTICS rows_inserted = ROW_COUNT;

total_processed := total_processed + rows_inserted;
RAISE NOTICE 'Batch processed: % rows. Total processed: % rows.', rows_inserted, total_processed;

EXIT WHEN rows_inserted = 0;

SELECT MAX(id) INTO last_processed_id FROM activities WHERE id > last_processed_id;

END LOOP;

RAISE NOTICE 'All rows processed. Total rows inserted: %.', total_processed;
END
$$;
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import {
changeOverride as changeMemberOrganizationAffiliationOverride,
findOverrides as findMemberOrganizationAffiliationOverrides,
findPrimaryWorkExperiencesOfMember,
} from '@crowd/data-access-layer/src/member_organization_affiliation_overrides'
import { IChangeAffiliationOverrideData } from '@crowd/types'
import {
IChangeAffiliationOverrideData,
IMemberOrganizationAffiliationOverride,
} from '@crowd/types'

import { IRepositoryOptions } from '../IRepositoryOptions'
import SequelizeRepository from '../sequelizeRepository'
Expand All @@ -17,6 +21,14 @@ class MemberOrganizationAffiliationOverridesRepository {
])
return overrides[0]
}

static async findPrimaryWorkExperiences(
memberId: string,
options: IRepositoryOptions,
): Promise<IMemberOrganizationAffiliationOverride[]> {
const qx = SequelizeRepository.getQueryExecutor(options)
return findPrimaryWorkExperiencesOfMember(qx, memberId)
}
}

export default MemberOrganizationAffiliationOverridesRepository
37 changes: 36 additions & 1 deletion backend/src/services/member/memberAffiliationsService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable no-continue */
import { uniq } from 'lodash'

import { groupBy } from '@crowd/common'
import { Error400, dateIntersects, groupBy } from '@crowd/common'
import { findMaintainerRoles } from '@crowd/data-access-layer/src/maintainers'
import { fetchManySegments } from '@crowd/data-access-layer/src/segments'
import { LoggerBase } from '@crowd/logging'
Expand All @@ -13,6 +13,7 @@ import {

import MemberAffiliationsRepository from '@/database/repositories/member/memberAffiliationsRepository'
import MemberOrganizationAffiliationOverridesRepository from '@/database/repositories/member/memberOrganizationAffiliationOverridesRepository'
import MemberOrganizationsRepository from '@/database/repositories/member/memberOrganizationsRepository'
import SequelizeRepository from '@/database/repositories/sequelizeRepository'

import { IServiceOptions } from '../IServiceOptions'
Expand Down Expand Up @@ -66,6 +67,40 @@ export default class MemberAffiliationsService extends LoggerBase {
async changeAffiliationOverride(
data: IChangeAffiliationOverrideData,
): Promise<IMemberOrganizationAffiliationOverride> {
if (data.isPrimaryWorkExperience) {
// check if any other work experience in intersecting date range was marked as primary
// we don't allow this because "isPrimaryWorkExperience" decides which work exp to pick on date conflicts
const allWorkExperiencesOfMember = (
await MemberOrganizationsRepository.list(data.memberId, this.options)
).map((mo) => mo.memberOrganizations)

const currentlyEditedWorkExperience = allWorkExperiencesOfMember.find(
(w) => w.id === data.memberOrganizationId,
)

const primaryWorkExperiencesOfMember = allWorkExperiencesOfMember.filter(
(w) => w.affiliationOverride.isPrimaryWorkExperience,
)

if (currentlyEditedWorkExperience.affiliationOverride.isPrimaryWorkExperience === false) {
for (const existingPrimaryWorkExp of primaryWorkExperiencesOfMember) {
if (
dateIntersects(
existingPrimaryWorkExp.dateStart as string,
existingPrimaryWorkExp.dateEnd as string,
currentlyEditedWorkExperience.dateStart as string,
currentlyEditedWorkExperience.dateEnd as string,
)
) {
throw new Error400(
this.options.language,
`Date range conflicts with another primary work experience id = ${existingPrimaryWorkExp.id}`,
)
}
}
}
}

const override = MemberOrganizationAffiliationOverridesRepository.changeOverride(
data,
this.options,
Expand Down
24 changes: 24 additions & 0 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from '@crowd/common'
import { SearchSyncWorkerEmitter } from '@crowd/common_services'
import {
createOrUpdateRelations,
findCommitsForPRSha,
findMatchingPullRequestNodeId,
insertActivities,
Expand All @@ -31,6 +32,7 @@ import { IDbMember } from '@crowd/data-access-layer/src/old/apps/data_sink_worke
import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo'
import RequestedForErasureMemberIdentitiesRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo'
import SettingsRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/settings.repo'
import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor'
import { DEFAULT_ACTIVITY_TYPE_SETTINGS, GithubActivityType } from '@crowd/integrations'
import { GitActivityType } from '@crowd/integrations/src/integrations/git/types'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
Expand Down Expand Up @@ -84,6 +86,7 @@ export default class ActivityService extends LoggerBase {
})

const id = await this.pgStore.transactionally(async (txStore) => {
const queryExecutor = dbStoreQx(txStore)
const txSettingsRepo = new SettingsRepository(txStore, this.log)

await txSettingsRepo.createActivityType(
Expand Down Expand Up @@ -132,6 +135,16 @@ export default class ActivityService extends LoggerBase {
importHash: activity.importHash,
},
])
await createOrUpdateRelations(queryExecutor, {
activityId: activity.id,
segmentId,
memberId: activity.memberId,
objectMemberId: activity.objectMemberId,
organizationId: activity.organizationId,
platform: activity.platform,
username: activity.username,
objectMemberUsername: activity.objectMemberUsername,
})
} catch (error) {
this.log.error('Error creating activity in QuestDB:', error)
throw error
Expand Down Expand Up @@ -160,6 +173,7 @@ export default class ActivityService extends LoggerBase {
try {
let toUpdate: IDbActivityUpdateData
const updated = await this.pgStore.transactionally(async (txStore) => {
const queryExecutor = dbStoreQx(txStore)
const txSettingsRepo = new SettingsRepository(txStore, this.log)

toUpdate = await this.mergeActivityData(activity, original)
Expand Down Expand Up @@ -215,6 +229,16 @@ export default class ActivityService extends LoggerBase {
importHash: original.importHash,
},
])
await createOrUpdateRelations(queryExecutor, {
activityId: id,
segmentId,
memberId: toUpdate.memberId || original.memberId,
objectMemberId: toUpdate.objectMemberId || original.objectMemberId,
organizationId: toUpdate.organizationId || original.organizationId,
platform: toUpdate.platform || (original.platform as PlatformType),
username: toUpdate.username || original.username,
objectMemberUsername: toUpdate.objectMemberUsername || original.objectMemberUsername,
})
} catch (error) {
this.log.error('Error updating (by inserting) activity in QuestDB:', error)
throw error
Expand Down
12 changes: 12 additions & 0 deletions services/apps/entity_merging_worker/src/activities/members.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { WorkflowIdReusePolicy } from '@temporalio/workflow'

import {
moveActivityRelationsToAnotherMember,
moveActivityRelationsWithIdentityToAnotherMember,
} from '@crowd/data-access-layer'
import { cleanupMemberAggregates } from '@crowd/data-access-layer/src/members/segments'
import {
cleanupMember,
Expand Down Expand Up @@ -39,6 +43,7 @@ export async function moveActivitiesBetweenMembers(
return
}
await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
await moveActivityRelationsToAnotherMember(dbStoreQx(svc.postgres.writer), secondaryId, primaryId)
}

export async function moveActivitiesWithIdentityToAnotherMember(
Expand Down Expand Up @@ -74,6 +79,13 @@ export async function moveActivitiesWithIdentityToAnotherMember(
identity.value,
identity.platform,
)
await moveActivityRelationsWithIdentityToAnotherMember(
dbStoreQx(svc.postgres.writer),
fromId,
toId,
identity.value,
identity.platform,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { moveActivityRelationsToAnotherOrganization } from '@crowd/data-access-layer'
import {
deleteOrganizationById,
deleteOrganizationSegments,
Expand Down Expand Up @@ -30,6 +31,11 @@ export async function moveActivitiesBetweenOrgs(
tenantId: string,
): Promise<void> {
await moveActivitiesToNewOrg(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
await moveActivityRelationsToAnotherOrganization(
dbStoreQx(svc.postgres.writer),
primaryId,
secondaryId,
)
}

export async function recalculateActivityAffiliationsOfOrganizationSynchronous(
Expand Down
22 changes: 22 additions & 0 deletions services/libs/common/src/timing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,25 @@ export const getLongestDateRange = <T extends IMemberOrganization>(orgs: T[]) =>

return sortedByDateRange[0]
}

export const dateIntersects = (
d1Start?: string | null,
d1End?: string | null,
d2Start?: string | null,
d2End?: string | null,
): boolean => {
// If both periods have no dates at all, we consider they span all time
if ((!d1Start && !d1End) || (!d2Start && !d2End)) {
return true
}

// Convert strings to timestamps, using fallbacks for missing dates
const start1 = d1Start ? new Date(d1Start).getTime() : -Infinity
const end1 = d1End ? new Date(d1End).getTime() : Infinity
const start2 = d2Start ? new Date(d2Start).getTime() : -Infinity
const end2 = d2End ? new Date(d2End).getTime() : Infinity

// Periods intersect if one period's start is before other period's end
// and that same period's end is after the other period's start
return start1 <= end2 && end1 >= start2
}
Loading
Loading