diff --git a/backend/src/database/migrations/U1732118484__members-segments-agg-created-at.sql b/backend/src/database/migrations/U1732118484__members-segments-agg-created-at.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1732118484__members-segments-agg-created-at.sql b/backend/src/database/migrations/V1732118484__members-segments-agg-created-at.sql new file mode 100644 index 0000000000..5f416c9b33 --- /dev/null +++ b/backend/src/database/migrations/V1732118484__members-segments-agg-created-at.sql @@ -0,0 +1 @@ +ALTER TABLE "memberSegmentsAgg" ADD COLUMN IF NOT EXISTS "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(); diff --git a/services/apps/entity_merging_worker/src/activities/organizations.ts b/services/apps/entity_merging_worker/src/activities/organizations.ts index b2759cd820..066c631ba7 100644 --- a/services/apps/entity_merging_worker/src/activities/organizations.ts +++ b/services/apps/entity_merging_worker/src/activities/organizations.ts @@ -66,13 +66,13 @@ export async function recalculateActivityAffiliationsOfOrganizationSynchronous( ) } -export async function syncOrganization(organizationId: string): Promise { +export async function syncOrganization(organizationId: string, syncStart: Date): Promise { const syncApi = new SearchSyncApiClient({ baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'], }) await syncApi.triggerOrganizationSync(organizationId) - await syncApi.triggerOrganizationMembersSync(null, organizationId) + await syncApi.triggerOrganizationMembersSync(null, organizationId, null, syncStart) } export async function notifyFrontendOrganizationUnmergeSuccessful( diff --git a/services/apps/entity_merging_worker/src/workflows/all.ts b/services/apps/entity_merging_worker/src/workflows/all.ts index 4a215474f7..d3416f2e05 100644 --- a/services/apps/entity_merging_worker/src/workflows/all.ts +++ b/services/apps/entity_merging_worker/src/workflows/all.ts @@ -21,7 +21,7 @@ const { notifyFrontendMemberUnmergeSuccessful, syncRemoveMember, } = proxyActivities({ - startToCloseTimeout: '15 minutes', + startToCloseTimeout: '60 minutes', }) export async function finishMemberMerging( @@ -99,7 +99,8 @@ export async function finishOrganizationMerging( await moveActivitiesBetweenOrgs(primaryId, secondaryId, tenantId) - await syncOrganization(primaryId) + const syncStart = new Date() + await syncOrganization(primaryId, syncStart) await deleteOrganization(secondaryId) await setMergeAction(primaryId, secondaryId, tenantId, { state: 'merged' as MergeActionState, @@ -128,8 +129,9 @@ export async function finishOrganizationUnmerging( }) await recalculateActivityAffiliationsOfOrganizationSynchronous(primaryId, tenantId) await recalculateActivityAffiliationsOfOrganizationSynchronous(secondaryId, tenantId) - await syncOrganization(primaryId) - await syncOrganization(secondaryId) + const syncStart = new Date() + await syncOrganization(primaryId, syncStart) + await syncOrganization(secondaryId, syncStart) await setMergeAction(primaryId, secondaryId, tenantId, { state: 'unmerged' as MergeActionState, step: MergeActionStep.UNMERGE_DONE, diff --git a/services/apps/search_sync_api/src/routes/member.ts b/services/apps/search_sync_api/src/routes/member.ts index 773d906d16..f96e2072e6 100644 --- a/services/apps/search_sync_api/src/routes/member.ts +++ b/services/apps/search_sync_api/src/routes/member.ts @@ -49,12 +49,14 @@ router.post( asyncWrap(async (req: ApiRequest, res) => { const memberSyncService = syncService(req) - const { organizationId } = req.body + const { organizationId, syncFrom } = req.body try { req.log.trace( `Calling memberSyncService.syncOrganizationMembers for organization ${organizationId}`, ) - await memberSyncService.syncOrganizationMembers(organizationId) + await memberSyncService.syncOrganizationMembers(organizationId, { + syncFrom: syncFrom ? new Date(syncFrom) : null, + }) res.sendStatus(200) } catch (error) { req.log.error(error) diff --git a/services/libs/data-access-layer/src/members/segments.ts b/services/libs/data-access-layer/src/members/segments.ts index 2b6af101d9..56157985a7 100644 --- a/services/libs/data-access-layer/src/members/segments.ts +++ b/services/libs/data-access-layer/src/members/segments.ts @@ -7,6 +7,14 @@ import { IMemberAbsoluteAggregates, IMemberSegmentAggregates } from './types' const log = getServiceChildLogger('organizations/segments') +export async function findLastSyncDate(qx: QueryExecutor, memberId: string): Promise { + const result = await qx.selectOneOrNone( + `SELECT MAX("createdAt") AS "lastSyncDate" FROM "memberSegmentsAgg" WHERE "memberId" = $(memberId)`, + { memberId }, + ) + return result?.lastSyncDate ? new Date(result.lastSyncDate) : null +} + export async function cleanupMemberAggregates(qx: QueryExecutor, memberId: string) { return qx.result( ` diff --git a/services/libs/opensearch/src/apiClient.ts b/services/libs/opensearch/src/apiClient.ts index 4e3a69989b..24e60256f0 100644 --- a/services/libs/opensearch/src/apiClient.ts +++ b/services/libs/opensearch/src/apiClient.ts @@ -40,6 +40,8 @@ export class SearchSyncApiClient { public async triggerOrganizationMembersSync( tenantId: string, organizationId: string, + onboarding?: boolean, + syncFrom: Date | null = null, ): Promise { if (!organizationId) { throw new Error('organizationId is required!') @@ -47,6 +49,7 @@ export class SearchSyncApiClient { await this.searchSyncApi.post('/sync/organization/members', { organizationId, + syncFrom, }) } diff --git a/services/libs/opensearch/src/repo/member.repo.ts b/services/libs/opensearch/src/repo/member.repo.ts index fe8231ee53..26b2ed9181 100644 --- a/services/libs/opensearch/src/repo/member.repo.ts +++ b/services/libs/opensearch/src/repo/member.repo.ts @@ -48,7 +48,7 @@ export class MemberRepository extends RepositoryBase { select m.id from members m left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type) - where m."tenantId" = $(tenantId) and + where m."tenantId" = $(tenantId) and ie.entity_id is null limit ${perPage};`, { @@ -64,29 +64,37 @@ export class MemberRepository extends RepositoryBase { organizationId: string, perPage: number, lastId?: string, - ): Promise { - return await this.db().any( + syncFrom?: Date, + ): Promise { + const rows = await this.db().any( ` - select distinct mo."memberId", m."manuallyCreated" - from "memberOrganizations" mo - inner join members m on mo."memberId" = m.id - where mo."organizationId" = $(organizationId) and - mo."deletedAt" is null and - ${lastId !== undefined ? 'mo."memberId" > $(lastId) and' : ''} - m."deletedAt" is null and + SELECT + DISTINCT mo."memberId" + FROM "memberOrganizations" mo + INNER JOIN members m ON mo."memberId" = m.id + ${syncFrom !== undefined ? 'LEFT JOIN "memberSegmentsAgg" msa ON m.id = msa."memberId"' : ''} + WHERE mo."organizationId" = $(organizationId) AND + mo."deletedAt" is null AND + ${syncFrom !== undefined ? '(msa."createdAt" < $(syncFrom) OR msa."createdAt" IS NULL) AND' : ''} + ${lastId !== undefined ? 'mo."memberId" > $(lastId) AND' : ''} + m."deletedAt" is null AND exists (select 1 from "memberIdentities" where "memberId" = mo."memberId") - order by mo."memberId" - limit ${perPage};`, + ORDER BY mo."memberId" + LIMIT ${perPage}; + `, { organizationId, lastId, + syncFrom, }, ) + + return rows.map((r) => r.memberId) } public async getMemberData(memberId: string): Promise { const results = await this.db().oneOrNone( - ` + ` with to_merge_data as ( select mtm."memberId", array_agg(distinct mtm."toMergeId"::text) as to_merge_ids @@ -208,7 +216,7 @@ export class MemberRepository extends RepositoryBase { where mtk."memberId" = $(memberId) and tk."deletedAt" is null group by mtk."memberId") - select + select m.id, m."tenantId", m."displayName", @@ -254,7 +262,7 @@ export class MemberRepository extends RepositoryBase { ` select m.id as "memberId", m."manuallyCreated" from members m - where m."tenantId" = $(tenantId ) and + where m."tenantId" = $(tenantId) and m.id in ($(memberIds:csv)) and exists(select 1 from "memberIdentities" mi where mi."memberId" = m.id) `, diff --git a/services/libs/opensearch/src/service/member.sync.service.ts b/services/libs/opensearch/src/service/member.sync.service.ts index 4822711df0..abae81d299 100644 --- a/services/libs/opensearch/src/service/member.sync.service.ts +++ b/services/libs/opensearch/src/service/member.sync.service.ts @@ -10,6 +10,7 @@ import { getMemberAggregates } from '@crowd/data-access-layer/src/activities' import { cleanupMemberAggregates, fetchAbsoluteMemberAggregates, + findLastSyncDate, insertMemberSegments, } from '@crowd/data-access-layer/src/members/segments' import { IMemberSegmentAggregates } from '@crowd/data-access-layer/src/members/types' @@ -309,8 +310,12 @@ export class MemberSyncService { ) } - public async syncOrganizationMembers(organizationId: string, batchSize = 200): Promise { + public async syncOrganizationMembers( + organizationId: string, + opts: { syncFrom: Date | null } = { syncFrom: null }, + ): Promise { this.log.debug({ organizationId }, 'Syncing all organization members!') + const batchSize = 500 let docCount = 0 let memberCount = 0 @@ -318,24 +323,23 @@ export class MemberSyncService { const loadNextPage = async (lastId?: string): Promise => { this.log.info('Loading next page of organization members!', { organizationId, lastId }) - const memberIdData = await logExecutionTimeV2( - () => this.memberRepo.getOrganizationMembersForSync(organizationId, batchSize, lastId), + const memberIds = await logExecutionTimeV2( + () => + this.memberRepo.getOrganizationMembersForSync( + organizationId, + batchSize, + lastId, + opts.syncFrom, + ), this.log, `getOrganizationMembersForSync`, ) - if (memberIdData.length === 0) { + if (memberIds.length === 0) { return [] } - const membersWithActivities = await filterMembersWithActivities( - this.qdbStore.connection(), - memberIdData.map((m) => m.memberId), - ) - - return memberIdData - .filter((m) => m.manuallyCreated || membersWithActivities.includes(m.memberId)) - .map((m) => m.memberId) + return memberIds } let memberIds: string[] = await loadNextPage() @@ -344,7 +348,7 @@ export class MemberSyncService { for (let i = 0; i < memberIds.length; i++) { const memberId = memberIds[i] const { membersSynced, documentsIndexed } = await logExecutionTimeV2( - () => this.syncMembers(memberId), + () => this.syncMembers(memberId, { withAggs: true, syncFrom: opts.syncFrom }), this.log, `syncMembers (${i}/${memberIds.length})`, ) @@ -371,11 +375,22 @@ export class MemberSyncService { public async syncMembers( memberId: string, - opts: { withAggs?: boolean } = { withAggs: true }, + opts: { withAggs?: boolean; syncFrom?: Date } = { withAggs: true }, ): Promise { const qx = repoQx(this.memberRepo) const syncMemberAggregates = async (memberId) => { + if (opts.syncFrom) { + const lastSyncDate = await findLastSyncDate(qx, memberId) + if (lastSyncDate && lastSyncDate.getTime() > opts.syncFrom.getTime()) { + this.log.info( + `Skipping sync of member aggregates as last sync date is greater than syncFrom!`, + { memberId, lastSyncDate, syncFrom: opts.syncFrom }, + ) + return + } + } + let documentsIndexed = 0 let memberData: IMemberSegmentAggregates[]