Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE "memberSegmentsAgg" ADD COLUMN IF NOT EXISTS "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW();
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ export async function recalculateActivityAffiliationsOfOrganizationSynchronous(
)
}

export async function syncOrganization(organizationId: string): Promise<void> {
export async function syncOrganization(organizationId: string, syncStart: Date): Promise<void> {
const syncApi = new SearchSyncApiClient({
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'],
})

await syncApi.triggerOrganizationSync(organizationId)
await syncApi.triggerOrganizationMembersSync(null, organizationId)
await syncApi.triggerOrganizationMembersSync(null, organizationId, null, syncStart)
}

export async function notifyFrontendOrganizationUnmergeSuccessful(
Expand Down
10 changes: 6 additions & 4 deletions services/apps/entity_merging_worker/src/workflows/all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const {
notifyFrontendMemberUnmergeSuccessful,
syncRemoveMember,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '15 minutes',
startToCloseTimeout: '60 minutes',
})

export async function finishMemberMerging(
Expand Down Expand Up @@ -99,7 +99,8 @@ export async function finishOrganizationMerging(

await moveActivitiesBetweenOrgs(primaryId, secondaryId, tenantId)

await syncOrganization(primaryId)
const syncStart = new Date()
await syncOrganization(primaryId, syncStart)
await deleteOrganization(secondaryId)
await setMergeAction(primaryId, secondaryId, tenantId, {
state: 'merged' as MergeActionState,
Expand Down Expand Up @@ -128,8 +129,9 @@ export async function finishOrganizationUnmerging(
})
await recalculateActivityAffiliationsOfOrganizationSynchronous(primaryId, tenantId)
await recalculateActivityAffiliationsOfOrganizationSynchronous(secondaryId, tenantId)
await syncOrganization(primaryId)
await syncOrganization(secondaryId)
const syncStart = new Date()
await syncOrganization(primaryId, syncStart)
await syncOrganization(secondaryId, syncStart)
Comment on lines +132 to +134
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Evaluate shared syncStart timestamp for sequential syncs

Both organizations use the same syncStart timestamp, but the syncs are performed sequentially. This means the second sync (secondaryId) might miss updates that occurred during the first sync (primaryId).

Consider this alternative implementation:

-  const syncStart = new Date()
-  await syncOrganization(primaryId, syncStart)
-  await syncOrganization(secondaryId, syncStart)
+  const primarySyncStart = new Date()
+  await syncOrganization(primaryId, primarySyncStart)
+  const secondarySyncStart = new Date()
+  await syncOrganization(secondaryId, secondarySyncStart)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const syncStart = new Date()
await syncOrganization(primaryId, syncStart)
await syncOrganization(secondaryId, syncStart)
const primarySyncStart = new Date()
await syncOrganization(primaryId, primarySyncStart)
const secondarySyncStart = new Date()
await syncOrganization(secondaryId, secondarySyncStart)

await setMergeAction(primaryId, secondaryId, tenantId, {
state: 'unmerged' as MergeActionState,
step: MergeActionStep.UNMERGE_DONE,
Expand Down
6 changes: 4 additions & 2 deletions services/apps/search_sync_api/src/routes/member.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ router.post(
asyncWrap(async (req: ApiRequest, res) => {
const memberSyncService = syncService(req)

const { organizationId } = req.body
const { organizationId, syncFrom } = req.body
try {
req.log.trace(
`Calling memberSyncService.syncOrganizationMembers for organization ${organizationId}`,
)
await memberSyncService.syncOrganizationMembers(organizationId)
await memberSyncService.syncOrganizationMembers(organizationId, {
syncFrom: syncFrom ? new Date(syncFrom) : null,
})
res.sendStatus(200)
} catch (error) {
req.log.error(error)
Expand Down
8 changes: 8 additions & 0 deletions services/libs/data-access-layer/src/members/segments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import { IMemberAbsoluteAggregates, IMemberSegmentAggregates } from './types'

const log = getServiceChildLogger('organizations/segments')

export async function findLastSyncDate(qx: QueryExecutor, memberId: string): Promise<Date | null> {
const result = await qx.selectOneOrNone(
`SELECT MAX("createdAt") AS "lastSyncDate" FROM "memberSegmentsAgg" WHERE "memberId" = $(memberId)`,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add index for memberId and createdAt columns to optimize the MAX aggregation query

The memberSegmentsAgg table has the createdAt column properly defined, but it lacks an index that would optimize the specific query that filters by memberId and performs MAX(createdAt). Current indexes:

  • A unique constraint on (memberId, segmentId)
  • An index on segmentId

To optimize the query performance, you should add a composite index on (memberId, createdAt).

🔗 Analysis chain

Verify database schema and indexing.

Let's verify the existence of the createdAt column and check if there's an index to optimize the MAX aggregation query.


Let me generate another verification script to check for indexes specifically on the memberSegmentsAgg table, as the previous results didn't show them clearly.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check schema and indexes for memberSegmentsAgg table

# Check if createdAt column exists and its definition
echo "Checking createdAt column definition:"
rg -A 5 '"memberSegmentsAgg".*CREATE TABLE|ALTER TABLE.*"memberSegmentsAgg".*ADD.*"createdAt"'

# Check for indexes on memberId and createdAt
echo "Checking indexes:"
rg -A 5 'CREATE.*INDEX.*ON.*"memberSegmentsAgg".*\("memberId"|"createdAt"\)'

Length of output: 10157


Script:

#!/bin/bash
# Search for memberSegmentsAgg table creation and its indexes
echo "Checking memberSegmentsAgg table creation and indexes:"
rg -A 10 'CREATE TABLE.*"memberSegmentsAgg"|CREATE.*INDEX.*"memberSegmentsAgg"'

# Search for any migration file containing memberSegmentsAgg
echo "Checking migrations containing memberSegmentsAgg:"
fd -e sql . | xargs rg "memberSegmentsAgg"

Length of output: 2540

{ memberId },
)
return result?.lastSyncDate ? new Date(result.lastSyncDate) : null
}

export async function cleanupMemberAggregates(qx: QueryExecutor, memberId: string) {
return qx.result(
`
Expand Down
3 changes: 3 additions & 0 deletions services/libs/opensearch/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ export class SearchSyncApiClient {
public async triggerOrganizationMembersSync(
tenantId: string,
organizationId: string,
onboarding?: boolean,
syncFrom: Date | null = null,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Update needed: triggerOrganizationMembersSync calls need to be aligned with the new signature

The verification revealed inconsistencies in the usage of triggerOrganizationMembersSync:

  • In backend/src/services/searchSyncService.ts: The method is called with only two parameters (tenantId, organizationId) and a hardcoded false
  • In services/apps/entity_merging_worker/src/activities/organizations.ts: The method is called with null as tenantId and includes syncStart
  • In services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts: The method signature needs to be checked for alignment

Please update all call sites to match the new signature with the optional syncFrom parameter.

🔗 Analysis chain

LGTM! Verify the parameter usage across the codebase.

The addition of the optional syncFrom parameter with a default value maintains backward compatibility while enabling optimized syncing.

Let's verify the parameter usage across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify all calls to triggerOrganizationMembersSync to ensure they're updated correctly

# Search for method calls
ast-grep --pattern 'triggerOrganizationMembersSync($$$)'

# Search for the route handler to verify parameter handling
rg -A 5 "'/sync/organization/members'" 

Length of output: 1581


Script:

#!/bin/bash
# Let's search for actual calls to the method in TypeScript/JavaScript files
rg -t ts -t js "triggerOrganizationMembersSync" -B 2 -A 2

# Also search for any test files that might be using this method
fd -e test.ts -e spec.ts -X rg "triggerOrganizationMembersSync"

Length of output: 2357

): Promise<void> {
if (!organizationId) {
throw new Error('organizationId is required!')
}

await this.searchSyncApi.post('/sync/organization/members', {
organizationId,
syncFrom,
})
}

Expand Down
38 changes: 23 additions & 15 deletions services/libs/opensearch/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
select m.id
from members m
left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type)
where m."tenantId" = $(tenantId) and
where m."tenantId" = $(tenantId) and
ie.entity_id is null
limit ${perPage};`,
{
Expand All @@ -64,29 +64,37 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
organizationId: string,
perPage: number,
lastId?: string,
): Promise<IMemberIdData[]> {
return await this.db().any(
syncFrom?: Date,
): Promise<string[]> {
const rows = await this.db().any(
`
select distinct mo."memberId", m."manuallyCreated"
from "memberOrganizations" mo
inner join members m on mo."memberId" = m.id
where mo."organizationId" = $(organizationId) and
mo."deletedAt" is null and
${lastId !== undefined ? 'mo."memberId" > $(lastId) and' : ''}
m."deletedAt" is null and
SELECT
DISTINCT mo."memberId"
FROM "memberOrganizations" mo
INNER JOIN members m ON mo."memberId" = m.id
${syncFrom !== undefined ? 'LEFT JOIN "memberSegmentsAgg" msa ON m.id = msa."memberId"' : ''}
WHERE mo."organizationId" = $(organizationId) AND
mo."deletedAt" is null AND
${syncFrom !== undefined ? '(msa."createdAt" < $(syncFrom) OR msa."createdAt" IS NULL) AND' : ''}
${lastId !== undefined ? 'mo."memberId" > $(lastId) AND' : ''}
m."deletedAt" is null AND
exists (select 1 from "memberIdentities" where "memberId" = mo."memberId")
order by mo."memberId"
limit ${perPage};`,
ORDER BY mo."memberId"
LIMIT ${perPage};
`,
Comment on lines +69 to +84
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Avoid SQL Injection Risks with Safe Query Construction

The SQL query is conditionally constructed using template literals and string interpolation, which can introduce SQL injection risks if not handled carefully. Even though parameters are used, it's safer to use query builders or parameterized query methods to construct SQL queries dynamically.

Consider refactoring the query to build the conditional clauses securely. Here's an example using parameterized conditions:

const conditions = [
  'mo."organizationId" = $(organizationId)',
  'mo."deletedAt" IS NULL',
  'm."deletedAt" IS NULL',
  'EXISTS (SELECT 1 FROM "memberIdentities" WHERE "memberId" = mo."memberId")',
];

if (syncFrom !== undefined) {
  conditions.push('(msa."createdAt" < $(syncFrom) OR msa."createdAt" IS NULL)');
}

if (lastId !== undefined) {
  conditions.push('mo."memberId" > $(lastId)');
}

const query = `
  SELECT DISTINCT mo."memberId"
  FROM "memberOrganizations" mo
  INNER JOIN members m ON mo."memberId" = m.id
  ${syncFrom !== undefined ? 'LEFT JOIN "memberSegmentsAgg" msa ON m.id = msa."memberId"' : ''}
  WHERE ${conditions.join(' AND ')}
  ORDER BY mo."memberId"
  LIMIT ${perPage};
`;

const rows = await this.db().any(query, {
  organizationId,
  syncFrom,
  lastId,
});

This approach helps prevent SQL injection and enhances the readability and maintainability of your code.

{
organizationId,
lastId,
syncFrom,
},
)

return rows.map((r) => r.memberId)
}

public async getMemberData(memberId: string): Promise<IDbMemberSyncData[]> {
const results = await this.db().oneOrNone(
`
`
with to_merge_data as (
select mtm."memberId",
array_agg(distinct mtm."toMergeId"::text) as to_merge_ids
Expand Down Expand Up @@ -208,7 +216,7 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
where mtk."memberId" = $(memberId)
and tk."deletedAt" is null
group by mtk."memberId")
select
select
m.id,
m."tenantId",
m."displayName",
Expand Down Expand Up @@ -254,7 +262,7 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
`
select m.id as "memberId", m."manuallyCreated"
from members m
where m."tenantId" = $(tenantId ) and
where m."tenantId" = $(tenantId) and
m.id in ($(memberIds:csv)) and
exists(select 1 from "memberIdentities" mi where mi."memberId" = m.id)
`,
Expand Down
43 changes: 29 additions & 14 deletions services/libs/opensearch/src/service/member.sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { getMemberAggregates } from '@crowd/data-access-layer/src/activities'
import {
cleanupMemberAggregates,
fetchAbsoluteMemberAggregates,
findLastSyncDate,
insertMemberSegments,
} from '@crowd/data-access-layer/src/members/segments'
import { IMemberSegmentAggregates } from '@crowd/data-access-layer/src/members/types'
Expand Down Expand Up @@ -309,33 +310,36 @@ export class MemberSyncService {
)
}

public async syncOrganizationMembers(organizationId: string, batchSize = 200): Promise<void> {
public async syncOrganizationMembers(
organizationId: string,
opts: { syncFrom: Date | null } = { syncFrom: null },
): Promise<void> {
this.log.debug({ organizationId }, 'Syncing all organization members!')
const batchSize = 500
let docCount = 0
let memberCount = 0

const now = new Date()

const loadNextPage = async (lastId?: string): Promise<string[]> => {
this.log.info('Loading next page of organization members!', { organizationId, lastId })
const memberIdData = await logExecutionTimeV2(
() => this.memberRepo.getOrganizationMembersForSync(organizationId, batchSize, lastId),
const memberIds = await logExecutionTimeV2(
() =>
this.memberRepo.getOrganizationMembersForSync(
organizationId,
batchSize,
lastId,
opts.syncFrom,
),
Comment on lines +313 to +333
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the search results, I can now generate the final response:

Update required for search_sync_worker's syncOrganizationMembers call

The syncFrom parameter is correctly propagated in the search_sync_api implementation, but the search_sync_worker's call at services/apps/search_sync_worker/src/queue/index.ts needs to be updated to include the new syncFrom parameter:

  • services/apps/search_sync_worker/src/queue/index.ts: Update .syncOrganizationMembers(data.organizationId) to include the options parameter

Note: The syncOrganizationMembers implementation in integration_sync_worker is a different method with different parameters, so it's not affected by this change.

🔗 Analysis chain

Verify the propagation of syncFrom parameter

The syncFrom parameter is correctly propagated to downstream calls. Let's verify there are no other callers that need to be updated.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for direct calls to getOrganizationMembersForSync to ensure all callers pass the syncFrom parameter
rg "getOrganizationMembersForSync" -A 5 -B 5

# Search for calls to syncOrganizationMembers to verify they're updated to use the new parameter
rg "syncOrganizationMembers" -A 5 -B 5

Length of output: 11134

this.log,
`getOrganizationMembersForSync`,
)

if (memberIdData.length === 0) {
if (memberIds.length === 0) {
return []
}

const membersWithActivities = await filterMembersWithActivities(
this.qdbStore.connection(),
memberIdData.map((m) => m.memberId),
)

return memberIdData
.filter((m) => m.manuallyCreated || membersWithActivities.includes(m.memberId))
.map((m) => m.memberId)
return memberIds
}

let memberIds: string[] = await loadNextPage()
Expand All @@ -344,7 +348,7 @@ export class MemberSyncService {
for (let i = 0; i < memberIds.length; i++) {
const memberId = memberIds[i]
const { membersSynced, documentsIndexed } = await logExecutionTimeV2(
() => this.syncMembers(memberId),
() => this.syncMembers(memberId, { withAggs: true, syncFrom: opts.syncFrom }),
this.log,
`syncMembers (${i}/${memberIds.length})`,
)
Expand All @@ -371,11 +375,22 @@ export class MemberSyncService {

public async syncMembers(
memberId: string,
opts: { withAggs?: boolean } = { withAggs: true },
opts: { withAggs?: boolean; syncFrom?: Date } = { withAggs: true },
): Promise<IMemberSyncResult> {
const qx = repoQx(this.memberRepo)

const syncMemberAggregates = async (memberId) => {
if (opts.syncFrom) {
const lastSyncDate = await findLastSyncDate(qx, memberId)
if (lastSyncDate && lastSyncDate.getTime() > opts.syncFrom.getTime()) {
this.log.info(
`Skipping sync of member aggregates as last sync date is greater than syncFrom!`,
{ memberId, lastSyncDate, syncFrom: opts.syncFrom },
)
return
}
}

let documentsIndexed = 0
let memberData: IMemberSegmentAggregates[]

Expand Down
Loading