Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
level,
client,
client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER),
3,
Number(process.env.WORKER_MAX_CONCURRENCY || 1),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Environment variable validation is needed for worker concurrency

The review suggestion is valid. The current implementation allows for potentially problematic values (negative numbers, non-integers, or NaN) which could affect the queue processing system's stability. The suggested validation ensures a positive integer value while maintaining the fallback behavior.

🔗 Analysis chain

Validate 'WORKER_MAX_CONCURRENCY' environment variable

The concurrency level is set using Number(process.env.WORKER_MAX_CONCURRENCY || 1). If WORKER_MAX_CONCURRENCY is not a valid number or is less than 1, it may lead to unexpected behavior.

Consider adding validation to ensure that the concurrency level is a positive integer. For example:

+const maxConcurrencyEnv = Number(process.env.WORKER_MAX_CONCURRENCY || 1)
+const maxConcurrency = Number.isInteger(maxConcurrencyEnv) && maxConcurrencyEnv > 0 ? maxConcurrencyEnv : 1

 super(
   level,
   client,
   client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER),
-  Number(process.env.WORKER_MAX_CONCURRENCY || 1),
+  maxConcurrency,
   parentLog,
   undefined,
   undefined,
   10,
 )
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Get the complete implementation of the queue file
cat services/apps/data_sink_worker/src/queue/index.ts

# Look for similar environment variable validations
rg "process\.env\..*Number\(" -A 2 -B 2

# Search for any existing number validation patterns
ast-grep --pattern 'Number.isInteger($_)'

Length of output: 2971

parentLog,
undefined,
undefined,
Expand Down Expand Up @@ -58,7 +58,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
break
case DataSinkWorkerQueueMessageType.CREATE_AND_PROCESS_ACTIVITY_RESULT: {
const msg = message as CreateAndProcessActivityResultQueueMessage
await service.createAndProcessActivityResult(
await service.processActivityInMemoryResult(
msg.tenantId,
msg.segmentId,
msg.integrationId,
Expand Down
38 changes: 26 additions & 12 deletions services/apps/data_sink_worker/src/service/dataSink.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { addSeconds } from '@crowd/common'
import { addSeconds, generateUUIDv1 } from '@crowd/common'
import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services'
import { DbStore } from '@crowd/data-access-layer/src/database'
import { IResultData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data'
Expand Down Expand Up @@ -42,6 +42,7 @@ export default class DataSinkService extends LoggerBase {

private async triggerResultError(
resultInfo: IResultData,
isCreated: boolean,
location: string,
message: string,
metadata?: unknown,
Expand All @@ -63,9 +64,15 @@ export default class DataSinkService extends LoggerBase {
// delay for #retries * 2 minutes
const until = addSeconds(new Date(), (resultInfo.retries + 1) * 2 * 60)
this.log.warn({ until: until.toISOString() }, 'Retrying result!')
await this.repo.delayResult(resultInfo.id, until, errorData)

await this.repo.delayResult(
resultInfo.id,
until,
errorData,
isCreated ? undefined : resultInfo,
)
} else {
await this.repo.markResultError(resultInfo.id, errorData)
await this.repo.markResultError(resultInfo.id, errorData, isCreated ? undefined : resultInfo)
}
}

Expand Down Expand Up @@ -98,27 +105,29 @@ export default class DataSinkService extends LoggerBase {
}
}

public async createAndProcessActivityResult(
public async processActivityInMemoryResult(
tenantId: string,
segmentId: string,
integrationId: string,
data: IActivityData,
): Promise<void> {
this.log.info({ tenantId, segmentId }, 'Creating and processing activity result.')
this.log.info({ tenantId, segmentId }, 'Processing in memory activity result.')

const payload = {
type: IntegrationResultType.ACTIVITY,
data,
segmentId,
}

const [integration, resultId] = await Promise.all([
integrationId ? this.repo.getIntegrationInfo(integrationId) : Promise.resolve(null),
this.repo.createResult(tenantId, integrationId, payload),
])
let integration

if (integrationId) {
integration = await this.repo.getIntegrationInfo(integrationId)
}

const id = generateUUIDv1()
const result: IResultData = {
id: resultId,
id,
tenantId,
integrationId,
data: payload,
Expand All @@ -132,7 +141,7 @@ export default class DataSinkService extends LoggerBase {
onboarding: false,
}

await this.processResult(resultId, result)
await this.processResult(id, result)
}

public async processResult(resultId: string, result?: IResultData): Promise<boolean> {
Expand Down Expand Up @@ -263,13 +272,18 @@ export default class DataSinkService extends LoggerBase {
type: data.type,
},
)
await this.repo.deleteResult(resultId)

if (!result) {
await this.repo.deleteResult(resultId)
}

return true
} catch (err) {
this.log.error(err, 'Error processing result.')
try {
await this.triggerResultError(
resultInfo,
result === undefined,
'process-result',
'Error processing result.',
undefined,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { distinct, singleOrDefault } from '@crowd/common'
import { DbStore, RepositoryBase } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IIntegrationResult, IntegrationResultState } from '@crowd/types'
import { IntegrationResultState } from '@crowd/types'

import { IDelayedResults, IFailedResultData, IIntegrationData, IResultData } from './dataSink.data'

Expand Down Expand Up @@ -46,28 +46,6 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
return result
}

public async createResult(
tenantId: string,
integrationId: string,
result: IIntegrationResult,
): Promise<string> {
const results = await this.db().one(
`
insert into integration.results(state, data, "tenantId", "integrationId")
values($(state), $(data), $(tenantId), $(integrationId))
returning id;
`,
{
tenantId,
integrationId,
state: IntegrationResultState.PENDING,
data: JSON.stringify(result),
},
)

return results.id
}

public async getOldResultsToProcessForTenant(
tenantId: string,
limit: number,
Expand Down Expand Up @@ -147,22 +125,44 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
}
}

public async markResultError(resultId: string, error: unknown): Promise<void> {
const result = await this.db().result(
`update integration.results
set state = $(state),
"processedAt" = now(),
error = $(error),
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
state: IntegrationResultState.ERROR,
error: JSON.stringify(error),
},
)
public async markResultError(
resultId: string,
error: unknown,
resultToCreate?: IResultData,
): Promise<void> {
if (resultToCreate) {
const result = await this.db().result(
`
insert into integration.results(state, data, "tenantId", "integrationId", error)
values($(state), $(data), $(tenantId), $(integrationId), $(error))
`,
{
tenantId: resultToCreate.tenantId,
integrationId: resultToCreate.integrationId,
state: IntegrationResultState.ERROR,
data: JSON.stringify(resultToCreate.data),
error: JSON.stringify(error),
},
)

this.checkUpdateRowCount(result.rowCount, 1)
this.checkUpdateRowCount(result.rowCount, 1)
} else {
const result = await this.db().result(
`update integration.results
set state = $(state),
"processedAt" = now(),
error = $(error),
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
state: IntegrationResultState.ERROR,
error: JSON.stringify(error),
},
)

this.checkUpdateRowCount(result.rowCount, 1)
}
}

public async deleteResult(resultId: string): Promise<void> {
Expand Down Expand Up @@ -266,24 +266,48 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
return result.map((r) => r.id)
}

public async delayResult(resultId: string, until: Date, error: unknown): Promise<void> {
const result = await this.db().result(
`update integration.results
set state = $(state),
error = $(error),
"delayedUntil" = $(until),
retries = coalesce(retries, 0) + 1,
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
until,
error: JSON.stringify(error),
state: IntegrationResultState.DELAYED,
},
)
public async delayResult(
resultId: string,
until: Date,
error: unknown,
resultToCreate?: IResultData,
): Promise<void> {
if (resultToCreate) {
const result = await this.db().result(
`
insert into integration.results(state, data, "tenantId", "integrationId", error, retries, "delayedUntil")
values($(state), $(data), $(tenantId), $(integrationId), $(error), $(retries), $(until))
`,
{
tenantId: resultToCreate.tenantId,
integrationId: resultToCreate.integrationId,
state: IntegrationResultState.DELAYED,
data: JSON.stringify(resultToCreate.data),
retries: 1,
error: JSON.stringify(error),
until: until,
},
)
this.checkUpdateRowCount(result.rowCount, 1)
} else {
const result = await this.db().result(
`update integration.results
set state = $(state),
error = $(error),
"delayedUntil" = $(until),
retries = coalesce(retries, 0) + 1,
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
until,
error: JSON.stringify(error),
state: IntegrationResultState.DELAYED,
},
)

this.checkUpdateRowCount(result.rowCount, 1)
this.checkUpdateRowCount(result.rowCount, 1)
}
}

public async getDelayedResults(limit: number): Promise<IDelayedResults[]> {
Expand Down
65 changes: 64 additions & 1 deletion services/libs/queue/src/vendors/kafka/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,38 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
this.reconnectAttempts = new Map<string, number>()
this.consumerStatus = new Map<string, boolean>()
}
async getQueueMessageCount(conf: IKafkaChannelConfig): Promise<number> {
const groupId = conf.name
const topic = conf.name

const admin = this.client.admin()
await admin.connect()

try {
const topicOffsets = await admin.fetchTopicOffsets(topic)
const offsetsResponse = await admin.fetchOffsets({
groupId: groupId,
topics: [topic],
})

const offsets = offsetsResponse[0].partitions

let totalLeft = 0
for (const offset of offsets) {
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
if (topicOffset.offset !== offset.offset) {
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
}
}

return totalLeft
} catch (err) {
this.log.error(err, 'Failed to get message count!')
throw err
} finally {
await admin.disconnect()
}
}

public async send(
channel: IQueueChannel,
Expand Down Expand Up @@ -294,6 +326,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
let retries = options?.retry || 0

let healthCheckInterval
let statisticsInterval

try {
this.started = true
Expand All @@ -318,6 +351,33 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
}
}, 10 * 60000) // Check every 10 minutes

let timings = []

statisticsInterval = setInterval(async () => {
if (!this.started) {
clearInterval(statisticsInterval)
return
}

try {
// Reset the timings array and calculate the average processing time
const durations = [...timings]
timings = []

// Get the number of messages left in the queue
const count = await this.getQueueMessageCount(queueConf)

let message = `Topic has ${count} messages left!`
if (durations.length > 0) {
const average = durations.reduce((a, b) => a + b, 0) / durations.length
message += ` In the last minute ${durations.length} messages were processed (${(durations.length / 60.0).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!`
}
this.log.info({ topic: queueConf.name }, message)
} catch (err) {
// do nothing
}
}, 60000) // check every minute
Comment on lines +354 to +379
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling and memory management in statistics tracking.

  1. The empty catch block could hide important errors.
  2. The timings array should be size-bounded to prevent memory issues.

Consider these improvements:

-  let timings = []
+  const MAX_TIMINGS = 1000; // Prevent unbounded growth
+  let timings: number[] = [];

   statisticsInterval = setInterval(async () => {
     if (!this.started) {
       clearInterval(statisticsInterval)
       return
     }

     try {
       // Reset the timings array and calculate the average processing time
       const durations = [...timings]
       timings = []

       // Get the number of messages left in the queue
       const count = await this.getQueueMessageCount(queueConf)

       let message = `Topic has ${count} messages left!`
       if (durations.length > 0) {
         const average = durations.reduce((a, b) => a + b, 0) / durations.length
         message += ` In the last minute ${durations.length} messages were processed (${(
           durations.length / 60.0
         ).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!`
       }
       this.log.info({ topic: queueConf.name }, message)
     } catch (err) {
-      // do nothing
+      this.log.error({ err, topic: queueConf.name }, 'Failed to collect queue statistics')
     }
   }, 60000)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let timings = []
statisticsInterval = setInterval(async () => {
if (!this.started) {
clearInterval(statisticsInterval)
return
}
try {
// Reset the timings array and calculate the average processing time
const durations = [...timings]
timings = []
// Get the number of messages left in the queue
const count = await this.getQueueMessageCount(queueConf)
let message = `Topic has ${count} messages left!`
if (durations.length > 0) {
const average = durations.reduce((a, b) => a + b, 0) / durations.length
message += ` In the last minute ${durations.length} messages were processed (${(durations.length / 60.0).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!`
}
this.log.info({ topic: queueConf.name }, message)
} catch (err) {
// do nothing
}
}, 60000) // check every minute
const MAX_TIMINGS = 1000; // Prevent unbounded growth
let timings: number[] = [];
statisticsInterval = setInterval(async () => {
if (!this.started) {
clearInterval(statisticsInterval)
return
}
try {
// Reset the timings array and calculate the average processing time
const durations = [...timings]
timings = []
// Get the number of messages left in the queue
const count = await this.getQueueMessageCount(queueConf)
let message = `Topic has ${count} messages left!`
if (durations.length > 0) {
const average = durations.reduce((a, b) => a + b, 0) / durations.length
message += ` In the last minute ${durations.length} messages were processed (${(
durations.length / 60.0
).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!`
}
this.log.info({ topic: queueConf.name }, message)
} catch (err) {
this.log.error({ err, topic: queueConf.name }, 'Failed to collect queue statistics')
}
}, 60000) // check every minute


this.log.trace({ topic: queueConf.name }, 'Subscribed to topic! Starting the consmer...')

await consumer.run({
Expand All @@ -334,10 +394,12 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
processMessage(data)
.then(() => {
const duration = performance.now() - now
this.log.info(`Message processed successfully in ${duration.toFixed(2)}ms!`)
timings.push(duration)
this.log.debug(`Message processed successfully in ${duration.toFixed(2)}ms!`)
Comment on lines +397 to +398
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding size bounds check when collecting timings.

Protect against memory issues by limiting the size of the timings array.

Apply this change to both success and error handlers:

 .then(() => {
   const duration = performance.now() - now
-  timings.push(duration)
+  if (timings.length < MAX_TIMINGS) {
+    timings.push(duration)
+  }
   this.log.debug(`Message processed successfully in ${duration.toFixed(2)}ms!`)
 })
 .catch((err) => {
   const duration = performance.now() - now
-  timings.push(duration)
+  if (timings.length < MAX_TIMINGS) {
+    timings.push(duration)
+  }
   this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`)
 })

Also applies to: 402-403

})
.catch((err) => {
const duration = performance.now() - now
timings.push(duration)
this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`)
})
.finally(() => {
Expand All @@ -349,6 +411,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
} catch (e) {
this.log.trace({ topic: queueConf.name, error: e }, 'Failed to start the queue!')
clearInterval(healthCheckInterval)
clearInterval(statisticsInterval)
if (retries < MAX_RETRY_FOR_CONNECTING_CONSUMER) {
retries++
this.log.trace({ topic: queueConf.name, retries }, 'Retrying to start the queue...')
Expand Down
Loading