-
Notifications
You must be signed in to change notification settings - Fork 728
Test queue perf #2763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Test queue perf #2763
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe pull request introduces modifications across multiple services to enhance configurability and refine result processing logic. The changes primarily focus on the Changes
Sequence DiagramsequenceDiagram
participant Worker as WorkerQueueReceiver
participant Service as DataSinkService
participant Repo as DataSinkRepository
Worker->>Service: processActivityInMemoryResult()
Service->>Service: Generate Result ID
Service->>Repo: Mark Result/Trigger Error
alt Result Exists
Repo-->>Service: Update Result
else Result Not Exists
Repo-->>Service: Create New Result
end
Service-->>Worker: Return Processing Result
Possibly related PRs
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts (2)
Line range hint
4-4: Remove unused import 'IIntegrationResult'The import
IIntegrationResultat line 4 is defined but never used, leading to a linter warning in the pipeline.Apply this diff to remove the unused import:
-import { IIntegrationResult, IntegrationResultState } from '@crowd/types' +import { IntegrationResultState } from '@crowd/types'
128-165: Refactor duplicated logic in 'markResultError' and 'delayResult' methodsThe methods
markResultErroranddelayResultcontain similar logic when handling theresultToCreateparameter, including database insertion and update operations.Consider abstracting the common code into a private helper method to reduce duplication and improve maintainability.
Also applies to: 269-310
services/apps/data_sink_worker/src/service/dataSink.service.ts (1)
Line range hint
45-73: Clarify the use of 'isCreated' flag in 'triggerResultError' methodThe
isCreatedparameter is used to decide whether to passundefinedorresultInfotomarkResultErroranddelayResultmethods. This logic can be confusing and may lead to unintended behavior.Consider refactoring or adding comments to make the intent behind the
isCreatedflag clearer. This will help future maintainers understand when and why a new result is created versus updating an existing one.services/libs/queue/src/vendors/kafka/client.ts (1)
45-76: Consider caching the admin client connection.The method creates and closes an admin client connection for each call. For frequent statistics gathering, this could be inefficient.
Consider maintaining a single admin client instance at the class level:
export class KafkaQueueService extends LoggerBase implements IQueue { + private adminClient: Admin; + + private async getAdminClient(): Promise<Admin> { + if (!this.adminClient) { + this.adminClient = this.client.admin(); + await this.adminClient.connect(); + } + return this.adminClient; + } + async getQueueMessageCount(conf: IKafkaChannelConfig): Promise<number> { - const admin = this.client.admin() - await admin.connect() + const admin = await this.getAdminClient(); try { // ... existing code ... - } finally { - await admin.disconnect() } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
services/apps/data_sink_worker/src/queue/index.ts(2 hunks)services/apps/data_sink_worker/src/service/dataSink.service.ts(6 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts(2 hunks)services/libs/queue/src/vendors/kafka/client.ts(5 hunks)
🧰 Additional context used
🪛 GitHub Actions: Node.js CI
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts
[warning] 4-4: 'IIntegrationResult' is defined but never used
🔇 Additional comments (2)
services/apps/data_sink_worker/src/service/dataSink.service.ts (1)
276-279: Ensure consistent handling of result deletion in 'processResult' methodIn the
processResultmethod, the result is deleted ifresultis not provided:if (!result) { await this.repo.deleteResult(resultId) }Confirm that deleting the result when
resultis undefined is the intended behavior. It's important to ensure that this logic does not inadvertently delete results that are still needed, especially ifresultInfowas fetched from the database.Also applies to: 286-286
✅ Verification successful
Result deletion is correctly implemented as cleanup mechanism
The deletion of results when
resultis not found is intentional and safe. The code first attempts to use the provided result, and if not available, tries to fetch it from the database. The deletion only occurs when both attempts fail, effectively cleaning up orphaned result references. This is properly tracked with telemetry and logging.🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash # Find the complete implementation of processResult method ast-grep --pattern 'async processResult($_) { $$$ }' # Search for deleteResult usage and surrounding context rg "deleteResult" -B 3 -A 3 # Look for any comments mentioning result deletion rg -i "delete.*result|result.*delete" -B 2 -A 2Length of output: 7332
Script:
#!/bin/bash # Find processResult method implementation with more context rg "processResult" -B 10 -A 20 # Look for test files related to dataSink fd "dataSink.*test|dataSink.*spec" --type fLength of output: 7066
services/libs/queue/src/vendors/kafka/client.ts (1)
414-414: LGTM: Proper cleanup of statistics interval.The cleanup of the statistics interval in the error handler is correctly implemented.
| client, | ||
| client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER), | ||
| 3, | ||
| Number(process.env.WORKER_MAX_CONCURRENCY || 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Environment variable validation is needed for worker concurrency
The review suggestion is valid. The current implementation allows for potentially problematic values (negative numbers, non-integers, or NaN) which could affect the queue processing system's stability. The suggested validation ensures a positive integer value while maintaining the fallback behavior.
🔗 Analysis chain
Validate 'WORKER_MAX_CONCURRENCY' environment variable
The concurrency level is set using Number(process.env.WORKER_MAX_CONCURRENCY || 1). If WORKER_MAX_CONCURRENCY is not a valid number or is less than 1, it may lead to unexpected behavior.
Consider adding validation to ensure that the concurrency level is a positive integer. For example:
+const maxConcurrencyEnv = Number(process.env.WORKER_MAX_CONCURRENCY || 1)
+const maxConcurrency = Number.isInteger(maxConcurrencyEnv) && maxConcurrencyEnv > 0 ? maxConcurrencyEnv : 1
super(
level,
client,
client.getQueueChannelConfig(CrowdQueue.DATA_SINK_WORKER),
- Number(process.env.WORKER_MAX_CONCURRENCY || 1),
+ maxConcurrency,
parentLog,
undefined,
undefined,
10,
)🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Get the complete implementation of the queue file
cat services/apps/data_sink_worker/src/queue/index.ts
# Look for similar environment variable validations
rg "process\.env\..*Number\(" -A 2 -B 2
# Search for any existing number validation patterns
ast-grep --pattern 'Number.isInteger($_)'
Length of output: 2971
| timings.push(duration) | ||
| this.log.debug(`Message processed successfully in ${duration.toFixed(2)}ms!`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding size bounds check when collecting timings.
Protect against memory issues by limiting the size of the timings array.
Apply this change to both success and error handlers:
.then(() => {
const duration = performance.now() - now
- timings.push(duration)
+ if (timings.length < MAX_TIMINGS) {
+ timings.push(duration)
+ }
this.log.debug(`Message processed successfully in ${duration.toFixed(2)}ms!`)
})
.catch((err) => {
const duration = performance.now() - now
- timings.push(duration)
+ if (timings.length < MAX_TIMINGS) {
+ timings.push(duration)
+ }
this.log.error(err, `Message processed unsuccessfully in ${duration.toFixed(2)}ms!`)
})Also applies to: 402-403
| let timings = [] | ||
|
|
||
| statisticsInterval = setInterval(async () => { | ||
| if (!this.started) { | ||
| clearInterval(statisticsInterval) | ||
| return | ||
| } | ||
|
|
||
| try { | ||
| // Reset the timings array and calculate the average processing time | ||
| const durations = [...timings] | ||
| timings = [] | ||
|
|
||
| // Get the number of messages left in the queue | ||
| const count = await this.getQueueMessageCount(queueConf) | ||
|
|
||
| let message = `Topic has ${count} messages left!` | ||
| if (durations.length > 0) { | ||
| const average = durations.reduce((a, b) => a + b, 0) / durations.length | ||
| message += ` In the last minute ${durations.length} messages were processed (${(durations.length / 60.0).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!` | ||
| } | ||
| this.log.info({ topic: queueConf.name }, message) | ||
| } catch (err) { | ||
| // do nothing | ||
| } | ||
| }, 60000) // check every minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling and memory management in statistics tracking.
- The empty catch block could hide important errors.
- The
timingsarray should be size-bounded to prevent memory issues.
Consider these improvements:
- let timings = []
+ const MAX_TIMINGS = 1000; // Prevent unbounded growth
+ let timings: number[] = [];
statisticsInterval = setInterval(async () => {
if (!this.started) {
clearInterval(statisticsInterval)
return
}
try {
// Reset the timings array and calculate the average processing time
const durations = [...timings]
timings = []
// Get the number of messages left in the queue
const count = await this.getQueueMessageCount(queueConf)
let message = `Topic has ${count} messages left!`
if (durations.length > 0) {
const average = durations.reduce((a, b) => a + b, 0) / durations.length
message += ` In the last minute ${durations.length} messages were processed (${(
durations.length / 60.0
).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!`
}
this.log.info({ topic: queueConf.name }, message)
} catch (err) {
- // do nothing
+ this.log.error({ err, topic: queueConf.name }, 'Failed to collect queue statistics')
}
}, 60000)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let timings = [] | |
| statisticsInterval = setInterval(async () => { | |
| if (!this.started) { | |
| clearInterval(statisticsInterval) | |
| return | |
| } | |
| try { | |
| // Reset the timings array and calculate the average processing time | |
| const durations = [...timings] | |
| timings = [] | |
| // Get the number of messages left in the queue | |
| const count = await this.getQueueMessageCount(queueConf) | |
| let message = `Topic has ${count} messages left!` | |
| if (durations.length > 0) { | |
| const average = durations.reduce((a, b) => a + b, 0) / durations.length | |
| message += ` In the last minute ${durations.length} messages were processed (${(durations.length / 60.0).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!` | |
| } | |
| this.log.info({ topic: queueConf.name }, message) | |
| } catch (err) { | |
| // do nothing | |
| } | |
| }, 60000) // check every minute | |
| const MAX_TIMINGS = 1000; // Prevent unbounded growth | |
| let timings: number[] = []; | |
| statisticsInterval = setInterval(async () => { | |
| if (!this.started) { | |
| clearInterval(statisticsInterval) | |
| return | |
| } | |
| try { | |
| // Reset the timings array and calculate the average processing time | |
| const durations = [...timings] | |
| timings = [] | |
| // Get the number of messages left in the queue | |
| const count = await this.getQueueMessageCount(queueConf) | |
| let message = `Topic has ${count} messages left!` | |
| if (durations.length > 0) { | |
| const average = durations.reduce((a, b) => a + b, 0) / durations.length | |
| message += ` In the last minute ${durations.length} messages were processed (${( | |
| durations.length / 60.0 | |
| ).toFixed(2)} msg/s) - average processing time: ${average.toFixed(2)}ms!` | |
| } | |
| this.log.info({ topic: queueConf.name }, message) | |
| } catch (err) { | |
| this.log.error({ err, topic: queueConf.name }, 'Failed to collect queue statistics') | |
| } | |
| }, 60000) // check every minute |
Changes proposed ✍️
What
copilot:summary
copilot:poem
Why
How
copilot:walkthrough
Checklist ✅
Feature,Improvement, orBug.Summary by CodeRabbit
Configuration
Performance Monitoring
Service Updates
Repository Changes