Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 15 additions & 24 deletions services/libs/opensearch/src/service/member.sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { OrganizationField, findOrgById } from '@crowd/data-access-layer/src/org
import { QueryExecutor, repoQx } from '@crowd/data-access-layer/src/queryExecutor'
import { fetchManySegments } from '@crowd/data-access-layer/src/segments'
import { DbStore } from '@crowd/database'
import { Logger, getChildLogger, logExecutionTimeV2 } from '@crowd/logging'
import { Logger, getChildLogger } from '@crowd/logging'
import { RedisClient } from '@crowd/redis'
import {
IMemberAttribute,
Expand Down Expand Up @@ -323,6 +323,10 @@ export class MemberSyncService {
lastId,
)

if (memberIdData.length === 0) {
return []
}

const membersWithActivities = await filterMembersWithActivities(
this.qdbStore.connection(),
memberIdData.map((m) => m.memberId),
Expand Down Expand Up @@ -370,11 +374,7 @@ export class MemberSyncService {
let memberData: IMemberSegmentAggregates[]

try {
memberData = await logExecutionTimeV2(
async () => getMemberAggregates(this.qdbStore.connection(), memberId),
this.log,
'getMemberAggregates',
)
memberData = await getMemberAggregates(this.qdbStore.connection(), memberId)

if (memberData.length === 0) {
return
Expand Down Expand Up @@ -420,19 +420,14 @@ export class MemberSyncService {

if (memberData.length > 0) {
try {
await logExecutionTimeV2(
async () =>
this.memberRepo.transactionally(
async (txRepo) => {
const qx = repoQx(txRepo)
await cleanupMemberAggregates(qx, memberId)
await insertMemberSegments(qx, memberData)
},
undefined,
true,
),
this.log,
'insertMemberSegments',
await this.memberRepo.transactionally(
async (txRepo) => {
const qx = repoQx(txRepo)
await cleanupMemberAggregates(qx, memberId)
await insertMemberSegments(qx, memberData)
},
undefined,
true,
)

documentsIndexed += memberData.length
Expand Down Expand Up @@ -477,11 +472,7 @@ export class MemberSyncService {
const prefixed = MemberSyncService.prefixData(data, attributes)
await this.openSearchService.index(memberId, OpenSearchIndex.MEMBERS, prefixed)
}
await logExecutionTimeV2(
async () => syncMembersToOpensearchForMergeSuggestions(memberId),
this.log,
'syncMembersToOpensearchForMergeSuggestions',
)
await syncMembersToOpensearchForMergeSuggestions(memberId)

return syncResults
}
Expand Down
35 changes: 11 additions & 24 deletions services/libs/opensearch/src/service/organization.sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { OrganizationField, findOrgById } from '@crowd/data-access-layer/src/org
import { QueryExecutor, repoQx } from '@crowd/data-access-layer/src/queryExecutor'
import { fetchManySegments } from '@crowd/data-access-layer/src/segments'
import { DbStore } from '@crowd/database'
import { Logger, getChildLogger, logExecutionTime, logExecutionTimeV2 } from '@crowd/logging'
import { Logger, getChildLogger, logExecutionTime } from '@crowd/logging'
import {
IOrganizationBaseForMergeSuggestions,
IOrganizationFullAggregatesOpensearch,
Expand Down Expand Up @@ -317,11 +317,7 @@ export class OrganizationSyncService {
for (const organizationId of organizationIds) {
let orgData: IDbOrganizationAggregateData[] = []
try {
orgData = await logExecutionTimeV2(
async () => getOrgAggregates(this.qdbStore.connection(), organizationId),
this.log,
'getOrgAggregates',
)
orgData = await getOrgAggregates(this.qdbStore.connection(), organizationId)

if (orgData.length > 0) {
// get segment data to aggregate for projects and project groups
Expand Down Expand Up @@ -370,19 +366,14 @@ export class OrganizationSyncService {

if (orgData.length > 0) {
try {
await logExecutionTimeV2(
async () =>
this.writeOrgRepo.transactionally(
async (txRepo) => {
const qx = repoQx(txRepo)
await cleanupForOganization(qx, organizationId)
await insertOrganizationSegments(qx, orgData)
},
undefined,
true,
),
this.log,
'insertOrganizationSegments',
await this.writeOrgRepo.transactionally(
async (txRepo) => {
const qx = repoQx(txRepo)
await cleanupForOganization(qx, organizationId)
await insertOrganizationSegments(qx, orgData)
},
undefined,
true,
Comment on lines +369 to +376
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing error handling granularity

While the transaction implementation is solid, consider separating the error handling for cleanup and insertion operations to provide more specific error messages and potentially different recovery strategies.

 await this.writeOrgRepo.transactionally(
   async (txRepo) => {
     const qx = repoQx(txRepo)
-    await cleanupForOganization(qx, organizationId)
-    await insertOrganizationSegments(qx, orgData)
+    try {
+      await cleanupForOganization(qx, organizationId)
+    } catch (e) {
+      this.log.error(e, `Failed to cleanup organization ${organizationId}`)
+      throw e
+    }
+    try {
+      await insertOrganizationSegments(qx, orgData)
+    } catch (e) {
+      this.log.error(e, `Failed to insert segments for organization ${organizationId}`)
+      throw e
+    }
   },
   undefined,
   true,
)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await this.writeOrgRepo.transactionally(
async (txRepo) => {
const qx = repoQx(txRepo)
await cleanupForOganization(qx, organizationId)
await insertOrganizationSegments(qx, orgData)
},
undefined,
true,
await this.writeOrgRepo.transactionally(
async (txRepo) => {
const qx = repoQx(txRepo)
try {
await cleanupForOganization(qx, organizationId)
} catch (e) {
this.log.error(e, `Failed to cleanup organization ${organizationId}`)
throw e
}
try {
await insertOrganizationSegments(qx, orgData)
} catch (e) {
this.log.error(e, `Failed to insert segments for organization ${organizationId}`)
throw e
}
},
undefined,
true,

)

organizationIdsToIndex.push(organizationId)
Expand Down Expand Up @@ -432,11 +423,7 @@ export class OrganizationSyncService {
organizationsSynced: organizationIds.length,
}
}
await logExecutionTimeV2(
async () => syncOrgsToOpensearchForMergeSuggestions(syncResults.organizationIdsToIndex),
this.log,
'syncOrgsToOpensearchForMergeSuggestions',
)
await syncOrgsToOpensearchForMergeSuggestions(syncResults.organizationIdsToIndex)

return syncResults
}
Expand Down