Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webui): Support viewing search results in context for JSON logs (clp-json). #596

Merged
merged 48 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
97d3e31
Make it work with a lot of duplications
haiqi96 Oct 30, 2024
1d8e8e5
deduplication the code in task launching scripts
haiqi96 Oct 30, 2024
c00e28a
Remove redundant task for json. Combine them into one
haiqi96 Oct 30, 2024
08acaa7
Linter
haiqi96 Oct 30, 2024
65bdd13
refactor
haiqi96 Oct 30, 2024
3de1532
Linter
haiqi96 Oct 31, 2024
6bd1eef
refactor
haiqi96 Nov 1, 2024
fbe5ced
proposal
haiqi96 Nov 2, 2024
8c93ba3
Add actual refactor
haiqi96 Nov 4, 2024
15c4b1d
Linter
haiqi96 Nov 4, 2024
8245200
deduplication
haiqi96 Nov 4, 2024
a0d4324
Decompressor end renaming
haiqi96 Nov 4, 2024
0edfe49
Renaming for task and scheduler
haiqi96 Nov 4, 2024
1f1dbb9
Mass renaming
haiqi96 Nov 4, 2024
e5a691c
Renaming for webui
haiqi96 Nov 5, 2024
29322c5
missing change
haiqi96 Nov 5, 2024
41b1023
polishing
haiqi96 Nov 5, 2024
2bcab50
Update comments
haiqi96 Nov 5, 2024
b19b9a9
fixes
haiqi96 Nov 5, 2024
a7a222d
fixes
haiqi96 Nov 7, 2024
f05d122
fixes2
haiqi96 Nov 7, 2024
8a93597
backup
haiqi96 Nov 8, 2024
563f11b
update webui part
haiqi96 Nov 8, 2024
f274da9
Support log event index version
haiqi96 Nov 12, 2024
ede18a8
Tried my best with linter
haiqi96 Nov 12, 2024
d2eae63
Factor variables to be global
haiqi96 Nov 13, 2024
d12a39d
Fix linter again
haiqi96 Nov 13, 2024
fdffdb9
Merge branch 'main' into webuiLogEventIdx
haiqi96 Nov 13, 2024
9e41095
Update to support latest CLP-S's search results output format
haiqi96 Nov 14, 2024
6a3d7c1
Merge branch 'main' into webuiLogEventIdx
haiqi96 Nov 18, 2024
eb66fc1
Further polishing
haiqi96 Nov 18, 2024
e7cddc2
fixes
haiqi96 Nov 19, 2024
d5a4990
make changes anyway
haiqi96 Nov 19, 2024
2618de0
code review suggestions
haiqi96 Nov 19, 2024
2b6ff2f
Apply suggestions from code review
haiqi96 Nov 19, 2024
a6b9fe9
Commit code review suggestions
haiqi96 Nov 19, 2024
be56609
Linter
haiqi96 Nov 19, 2024
bdfaf19
Merge branch 'main' into webuiLogEventIdx
haiqi96 Nov 19, 2024
63725ff
Add more error checking
haiqi96 Nov 19, 2024
ae6a601
Fixes
haiqi96 Nov 19, 2024
ce1acdb
Prepare for upcoming changes from CLP-s
haiqi96 Nov 19, 2024
b631316
fix
haiqi96 Nov 19, 2024
1bb72c0
Merge branch 'main' into webuiLogEventIdx
haiqi96 Nov 20, 2024
d06be72
fix missing changes
haiqi96 Nov 21, 2024
b9b8440
address code review
haiqi96 Nov 21, 2024
66e3466
linter
haiqi96 Nov 21, 2024
99d2240
commit code suggestions
haiqi96 Nov 21, 2024
a897682
Rearrange code and polish error message
haiqi96 Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ def start_log_viewer_webui(
"MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name,
"ClientDir": str(container_log_viewer_webui_dir / "client"),
"StreamFilesDir": str(container_clp_config.stream_output.directory),
"StreamTargetUncompressedSize": container_clp_config.stream_output.target_uncompressed_size,
"LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"),
}
settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates)
Expand Down
2 changes: 1 addition & 1 deletion components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def dump_to_primitive_dict(self):


class StreamOutput(BaseModel):
directory: pathlib.Path = pathlib.Path("var") / "data" / "stream"
directory: pathlib.Path = pathlib.Path("var") / "data" / "streams"
target_uncompressed_size: int = 128 * 1024 * 1024

@validator("directory")
Expand Down
29 changes: 20 additions & 9 deletions components/log-viewer-webui/client/src/api/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,31 @@ import axios from "axios";
*/

/**
* Submits a job to extract the split of an original file that contains a given log event. The file
* is extracted as a CLP IR file.
* @typedef {object} ExtractJsonResp
* @property {number} begin_msg_ix
* @property {number} end_msg_ix
* @property {boolean} is_last_ir_chunk
* @property {string} orig_file_id
* @property {string} path
* @property {string} _id
*/
Comment on lines +16 to +23
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update ExtractJsonResp type definition to match actual JSON response structure

The current type definition appears to be copied from ExtractIrResp without considering JSON-specific differences. Based on the PR objectives:

  1. Remove IR-specific fields:
    • is_last_ir_chunk
    • orig_file_id
  2. Add JSON-specific fields:
    • archive_id

Apply this diff to fix the type definition:

 /**
  * @typedef {object} ExtractJsonResp
  * @property {number} begin_msg_ix
  * @property {number} end_msg_ix
- * @property {boolean} is_last_ir_chunk
- * @property {string} orig_file_id
+ * @property {string} archive_id
  * @property {string} path
  * @property {string} _id
  */
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
* @typedef {object} ExtractJsonResp
* @property {number} begin_msg_ix
* @property {number} end_msg_ix
* @property {boolean} is_last_ir_chunk
* @property {string} orig_file_id
* @property {string} path
* @property {string} _id
*/
/**
* @typedef {object} ExtractJsonResp
* @property {number} begin_msg_ix
* @property {number} end_msg_ix
* @property {string} archive_id
* @property {string} path
* @property {string} _id
*/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rabbit understood it wrong. With that said, similar to the query job type, shall we document the source definition (i.e. the mongo db insertion code?) of this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junhaoliao, thank you for the clarification. It seems the current ExtractJsonResp type definition is accurate.

Documenting the source definition, such as the MongoDB insertion code, would be beneficial for future reference. You could add a comment above the typedef to indicate where this response structure is defined, for example:

// Response structure defined in server/src/routes/query.js during MongoDB insertion

This would help maintain consistency and clarity between the client and server code.


✏️ Learnings added
Learnt from: junhaoliao
PR: y-scope/clp#596
File: components/log-viewer-webui/client/src/api/query.js:16-23
Timestamp: 2024-11-21T15:51:33.005Z
Learning: In `components/log-viewer-webui/client/src/api/query.js`, the `ExtractJsonResp` type definition is accurate as-is and does not require modification. When suggesting changes to type definitions, ensure they align with the server-side definitions, referencing the source code if necessary.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


/**
* Submits a job to extract the stream that contains a given log event. The stream is extracted
* either as a CLP IR or a JSON Lines file.
*
* @param {number|string} origFileId The ID of the original file
* @param {number} logEventIdx The index of the log event
* @param {QUERY_JOB_TYPE} extractJobType
* @param {string} streamId
* @param {number} logEventIdx
* @param {Function} onUploadProgress Callback to handle upload progress events.
* @return {Promise<axios.AxiosResponse<ExtractIrResp>>}
* @return {Promise<axios.AxiosResponse<ExtractIrResp|ExtractJsonResp>>}
Comment on lines +25 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add import for QUERY_JOB_TYPE enum

The JSDoc references QUERY_JOB_TYPE but it's not imported. This could cause type checking issues.

Add the import at the top of the file:

 import axios from "axios";
+import {QUERY_JOB_TYPE} from "../constants/queryTypes";
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* Submits a job to extract the stream that contains a given log event. The stream is extracted
* either as a CLP IR or a JSON Lines file.
*
* @param {number|string} origFileId The ID of the original file
* @param {number} logEventIdx The index of the log event
* @param {QUERY_JOB_TYPE} extractJobType
* @param {string} streamId
* @param {number} logEventIdx
* @param {Function} onUploadProgress Callback to handle upload progress events.
* @return {Promise<axios.AxiosResponse<ExtractIrResp>>}
* @return {Promise<axios.AxiosResponse<ExtractIrResp|ExtractJsonResp>>}
import axios from "axios";
import {QUERY_JOB_TYPE} from "../constants/queryTypes";
/**
* Submits a job to extract the stream that contains a given log event. The stream is extracted
* either as a CLP IR or a JSON Lines file.
*
* @param {QUERY_JOB_TYPE} extractJobType
* @param {string} streamId
* @param {number} logEventIdx
* @param {Function} onUploadProgress Callback to handle upload progress events.
* @return {Promise<axios.AxiosResponse<ExtractIrResp|ExtractJsonResp>>}

*/
const submitExtractIrJob = async (origFileId, logEventIdx, onUploadProgress) => {
const submitExtractStreamJob = async (extractJobType, streamId, logEventIdx, onUploadProgress) => {
return await axios.post(
"/query/extract-ir",
{logEventIdx, origFileId},
"/query/extract-stream",
{extractJobType, streamId, logEventIdx},
{onUploadProgress}
);
};
Comment on lines +35 to 41
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add parameter validation for extractJobType

The function accepts an extractJobType parameter but doesn't validate its value. Consider adding validation to ensure only valid job types are processed.

 const submitExtractStreamJob = async (extractJobType, streamId, logEventIdx, onUploadProgress) => {
+    const validJobTypes = [EXTRACT_IR, EXTRACT_JSON]; // Import these from a constants file
+    if (!validJobTypes.includes(extractJobType)) {
+        throw new Error(`Invalid extractJobType: ${extractJobType}`);
+    }
     return await axios.post(
         "/query/extract-stream",
         {extractJobType, streamId, logEventIdx},
         {onUploadProgress}
     );
 };

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine to rely the JsDoc type checks to guard against misuses?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junhaoliao, JsDoc type annotations are indeed valuable for documentation and can assist with static analysis during development, especially when using tools like TypeScript. However, they don't enforce type checking at runtime in standard JavaScript. Adding explicit runtime validation for extractJobType ensures that any invalid inputs are caught immediately, which can prevent unexpected behavior or errors in production. If the function might be called with incorrect parameters—either due to coding errors or external factors—runtime checks can enhance the robustness of the code. That said, if you're confident that inputs are properly controlled and validated elsewhere, relying on JsDoc may be acceptable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since those APIs are not directly exposed to end users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junhaoliao, understood. Since these APIs are not directly exposed to end users, and their usage is controlled internally, relying on JsDoc type annotations may be sufficient in this context. Thank you for the clarification.


✏️ Learnings added
Learnt from: junhaoliao
PR: y-scope/clp#596
File: components/log-viewer-webui/client/src/api/query.js:35-41
Timestamp: 2024-11-19T19:52:43.245Z
Learning: For internal APIs in `components/log-viewer-webui/client/src/api/query.js`, runtime validation of parameters may not be necessary since the APIs are not exposed to end users, and JsDoc type annotations may be sufficient.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


export {submitExtractIrJob};
export {submitExtractStreamJob};
50 changes: 43 additions & 7 deletions components/log-viewer-webui/client/src/ui/QueryStatus.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,37 @@ import {

import {AxiosError} from "axios";

import {submitExtractIrJob} from "../api/query.js";
import {submitExtractStreamJob} from "../api/query.js";
import {QUERY_LOADING_STATES} from "../typings/query.js";
import Loading from "./Loading.jsx";


let enumQueryType;
/* eslint-disable sort-keys */
/**
* Note: This enum is duplicated from server, as it is non-trivial to include server enums from the
* client.
*
* Enum of job types, matching the `QueryJobType` class in
* `job_orchestration.query_scheduler.constants`.
*
* @enum {number}
*/
const QUERY_JOB_TYPE = Object.freeze({
SEARCH_OR_AGGREGATION: (enumQueryType = 0),
EXTRACT_IR: ++enumQueryType,
EXTRACT_JSON: ++enumQueryType,
Comment on lines +26 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid assignments within object initializations for clarity.

Assigning values to enumQueryType within the object initializer can be confusing and is flagged by static analysis tools. It's better to initialize enumQueryType before defining the object.

Apply this diff to resolve the issue:

-let enumQueryType;
+let enumQueryType = 0;

/* eslint-disable sort-keys */
/**
 * Note: This enum is duplicated from server, as it is non-trivial to include server enums from the
 * client.
 *
 * Enum of job types, matching the `QueryJobType` class in
 * `job_orchestration.query_scheduler.constants`.
 *
 * @enum {number}
 */
 const QUERY_JOB_TYPE = Object.freeze({
-    SEARCH_OR_AGGREGATION: (enumQueryType = 0),
+    SEARCH_OR_AGGREGATION: enumQueryType++,
     EXTRACT_IR: ++enumQueryType,
     EXTRACT_JSON: ++enumQueryType,
 });
 /* eslint-enable sort-keys */
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SEARCH_OR_AGGREGATION: (enumQueryType = 0),
EXTRACT_IR: ++enumQueryType,
EXTRACT_JSON: ++enumQueryType,
let enumQueryType = 0;
/* eslint-disable sort-keys */
/**
* Note: This enum is duplicated from server, as it is non-trivial to include server enums from the
* client.
*
* Enum of job types, matching the `QueryJobType` class in
* `job_orchestration.query_scheduler.constants`.
*
* @enum {number}
*/
const QUERY_JOB_TYPE = Object.freeze({
SEARCH_OR_AGGREGATION: enumQueryType++,
EXTRACT_IR: ++enumQueryType,
EXTRACT_JSON: ++enumQueryType,
});
/* eslint-enable sort-keys */
🧰 Tools
🪛 Biome

[error] 26-26: The assignment should not be in an expression.

The use of assignments in expressions is confusing.
Expressions are often considered as side-effect free.

(lint/suspicious/noAssignInExpressions)

});
/* eslint-enable sort-keys */

/**
* Mapping between job type enums and stream type
*/
const EXTRACT_JOB_TYPE = Object.freeze({
ir: QUERY_JOB_TYPE.EXTRACT_IR,
json: QUERY_JOB_TYPE.EXTRACT_JSON,
});

/**
* Submits queries and renders the query states.
*
Expand All @@ -28,20 +54,30 @@ const QueryStatus = () => {
isFirstRun.current = false;

const searchParams = new URLSearchParams(window.location.search);
const origFileId = searchParams.get("origFileId");
const streamType = searchParams.get("type");
const streamId = searchParams.get("streamId");
const logEventIdx = searchParams.get("logEventIdx");
if (null === origFileId || null === logEventIdx) {
const error = "Either `origFileId` or `logEventIdx` are missing from the URL " +
"parameters. Note that non-IR-extraction queries are not supported at the moment.";

if (null === streamType || null === streamId || null === logEventIdx) {
const error = "Queries parameters are missing from the URL parameters.";
console.error(error);
setErrorMsg(error);

return;
}

const extractJobType = EXTRACT_JOB_TYPE[streamType];
if ("undefined" === typeof extractJobType) {
const error = `Unsupported Stream type: ${streamType}`;
console.error(error);
setErrorMsg(error);

return;
}

submitExtractIrJob(
origFileId,
submitExtractStreamJob(
extractJobType,
streamId,
Number(logEventIdx),
() => {
setQueryState(QUERY_LOADING_STATES.WAITING);
Expand Down
1 change: 1 addition & 0 deletions components/log-viewer-webui/server/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@

"ClientDir": "../client/dist",
"StreamFilesDir": "../../../build/clp-package/var/data/streams",
"StreamTargetUncompressedSize": 134217728,
"LogViewerDir": "../yscope-log-viewer/dist"
}
21 changes: 12 additions & 9 deletions components/log-viewer-webui/server/src/DbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ let enumQueryType;
const QUERY_JOB_TYPE = Object.freeze({
SEARCH_OR_AGGREGATION: (enumQueryType = 0),
EXTRACT_IR: ++enumQueryType,
EXTRACT_JSON: ++enumQueryType,
});
/* eslint-enable sort-keys */

Expand Down Expand Up @@ -101,20 +102,21 @@ class DbManager {
}

/**
* Submits an IR extraction job to the scheduler and waits for it to finish.
* Submits a stream extraction job to the scheduler and waits for it to finish.
*
* @param {object} jobConfig
* @param {number} jobType
* @return {Promise<number|null>} The ID of the job or null if an error occurred.
*/
async submitAndWaitForExtractIrJob (jobConfig) {
async submitAndWaitForExtractStreamJob (jobConfig, jobType) {
let jobId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let jobId;
let jobConfig;
if (QUERY_JOB_TYPE.EXTRACT_IR === jobType) {
jobConfig = {
file_split_id: null,
msg_ix: logEventIdx,
orig_file_id: streamId,
target_uncompressed_size: targetUncompressedSize,
};
} else if (QUERY_JOB_TYPE.EXTRACT_JSON === jobType) {
jobConfig = {
archive_id: streamId,
target_chunk_size: targetUncompressedSize,
};
}
let jobId;

try {
const [result] = await this.#mysqlConnectionPool.query(
`INSERT INTO ${this.#queryJobsTableName} (job_config, type)
VALUES (?, ?)`,
[
Buffer.from(msgpackEncode(jobConfig)),
QUERY_JOB_TYPE.EXTRACT_IR,
jobType,
]
);

Expand All @@ -130,16 +132,16 @@ class DbManager {
}

/**
* Gets the metadata for an IR file extracted from part of an original file, where the original
* file has the given ID and the extracted part contains the given log event index.
* Gets the metadata for the extracted stream that has the given streamId and contains the
* given logEventIdx.
*
* @param {string} origFileId
* @param {string} streamId
* @param {number} logEventIdx
* @return {Promise<object>} A promise that resolves to the extracted IR file's metadata.
* @return {Promise<object>} A promise that resolves to the extracted stream's metadata.
*/
async getExtractedIrFileMetadata (origFileId, logEventIdx) {
async getExtractedStreamFileMetadata (streamId, logEventIdx) {
return await this.#streamFilesCollection.findOne({
orig_file_id: origFileId,
orig_file_id: streamId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't read the MongoDB insertion code, but it's a little odd that the key name orig_file_id is used in this stream file metadata collection. Could you confirm if this is intended and if so, do we have plans to update the DB insertion code?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also, semantically are there any differences between a stream ID and an original file ID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is tricky. "stream" is a concept I started using a few weeks ago in order to commonize IR and JSON. an IR stream is identified by it's "orig_file_id", and a JSON Line file is identified by "archive_id".

When the CLP code was written, we used "orig_file_id" for IR. Later when we wrote code for JSON, somehow we kept using "orig_file_id" to represent archive_id in the metadata. (I was not involved in the design process).

For now, the easiest way is to just use orig_file_id. After the release, we can either let JSON and IR use different field, or let them both use a single field "stream_id". I personally prefer the latter.

@kirkrodrigues any opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can switch it to stream_id.

begin_msg_ix: {$lte: logEventIdx},
end_msg_ix: {$gt: logEventIdx},
});
Expand Down Expand Up @@ -236,6 +238,7 @@ class DbManager {
}
}

export {QUERY_JOB_TYPE};
export default fastifyPlugin(async (app, options) => {
await app.decorate("dbManager", new DbManager(app, options));
});
95 changes: 73 additions & 22 deletions components/log-viewer-webui/server/src/routes/query.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,57 @@
// eslint-disable-next-line no-magic-numbers
const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024;
import settings from "../../settings.json" with {type: "json"};
import {QUERY_JOB_TYPE} from "../DbManager.js";


/**
* Submits a stream extraction job with the given parameters and waits for it.
*
* @param {import("fastify").FastifyInstance | {dbManager: DbManager}} fastify
* @param {QUERY_JOB_TYPE} extractJobType
* @param {string} streamId
* @param {int} sanitizedLogEventIdx
* @throws {Error} if the extract stream job submission fails.
*/
const submitAndWaitForExtractStreamJob = async (
fastify,
extractJobType,
streamId,
sanitizedLogEventIdx
) => {
let jobConfig;
const streamTargetUncompressedSize = settings.StreamTargetUncompressedSize;
if (QUERY_JOB_TYPE.EXTRACT_IR === extractJobType) {
jobConfig = {
file_split_id: null,
msg_ix: sanitizedLogEventIdx,
orig_file_id: streamId,
target_uncompressed_size: streamTargetUncompressedSize,
};
} else if (QUERY_JOB_TYPE.EXTRACT_JSON === extractJobType) {
jobConfig = {
archive_id: streamId,
target_chunk_size: streamTargetUncompressedSize,
};
}

if (null === jobConfig) {
const err = new Error(`Unsupported Job type: ${extractJobType}`);
err.statusCode = 400;
throw err;
}

const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob(
jobConfig,
extractJobType
);

if (null === extractResult) {
const err = new Error("Unable to extract stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);

err.statusCode = 400;
throw err;
}
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree, the logics in this helper can be moved into dbManager.submitAndWaitForExtractStreamJob. Let's see the suggestion there.

Suggested change
/**
* Submits a stream extraction job with the given parameters and waits for it.
*
* @param {import("fastify").FastifyInstance | {dbManager: DbManager}} fastify
* @param {QUERY_JOB_TYPE} extractJobType
* @param {string} streamId
* @param {int} sanitizedLogEventIdx
* @throws {Error} if the extract stream job submission fails.
*/
const submitAndWaitForExtractStreamJob = async (
fastify,
extractJobType,
streamId,
sanitizedLogEventIdx
) => {
let jobConfig;
const streamTargetUncompressedSize = settings.StreamTargetUncompressedSize;
if (QUERY_JOB_TYPE.EXTRACT_IR === extractJobType) {
jobConfig = {
file_split_id: null,
msg_ix: sanitizedLogEventIdx,
orig_file_id: streamId,
target_uncompressed_size: streamTargetUncompressedSize,
};
} else if (QUERY_JOB_TYPE.EXTRACT_JSON === extractJobType) {
jobConfig = {
archive_id: streamId,
target_chunk_size: streamTargetUncompressedSize,
};
}
if (null === jobConfig) {
const err = new Error(`Unsupported Job type: ${extractJobType}`);
err.statusCode = 400;
throw err;
}
const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob(
jobConfig,
extractJobType
);
if (null === extractResult) {
const err = new Error("Unable to extract stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);
err.statusCode = 400;
throw err;
}
};

/**
* Creates query routes.
Expand All @@ -9,37 +61,36 @@ const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024;
* @return {Promise<void>}
*/
const routes = async (fastify, options) => {
fastify.post("/query/extract-ir", async (req, resp) => {
const {origFileId, logEventIdx} = req.body;
fastify.post("/query/extract-stream", async (req, resp) => {
const {extractJobType, logEventIdx, streamId} = req.body;
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure backward compatibility when changing endpoint URL

Changing the endpoint from /query/extract-ir to /query/extract-stream may affect existing clients relying on the old URL. Consider supporting the old endpoint or informing clients to update to prevent disruptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok, we will update all client as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haiqi96 Understood. Let me know if there's anything else I can assist with.

(_/)
( •_•)
/>💧

const sanitizedLogEventIdx = Number(logEventIdx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move the parameter validation checks directly below this line.


Copy link
Contributor

@coderabbitai coderabbitai bot Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for request parameters.

While converting logEventIdx to a number, we should add proper validation for all input parameters to ensure robustness.

Apply this diff to add comprehensive validation:

-        const {extractJobType, logEventIdx, streamId} = req.body;
-        const sanitizedLogEventIdx = Number(logEventIdx);
+        const {extractJobType, logEventIdx, streamId} = req.body;
+        
+        if (!streamId || typeof streamId !== 'string') {
+            const err = new Error('Invalid streamId');
+            err.statusCode = 400;
+            throw err;
+        }
+        
+        if (!extractJobType || !Object.values(QUERY_JOB_TYPE).includes(extractJobType)) {
+            const err = new Error(`Invalid extractJobType: ${extractJobType}`);
+            err.statusCode = 400;
+            throw err;
+        }
+        
+        const sanitizedLogEventIdx = Number(logEventIdx);
+        if (isNaN(sanitizedLogEventIdx) || sanitizedLogEventIdx < 0) {
+            const err = new Error('Invalid logEventIdx');
+            err.statusCode = 400;
+            throw err;
+        }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junhaoliao you think it would be nice to add those checks? It might hit 20 statement limit though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, for extractJobType and streamId.
Let's add this list, to be exported, directly below QUERY_JOB_TYPE

/**
 * List of valid extract job types.
 */
const EXTRACT_JOB_TYPES = Object.freeze([
    QUERY_JOB_TYPE.EXTRACT_IR,
    QUERY_JOB_TYPE.EXTRACT_JSON,
]);

and add a check for extractJobType here. e.g.,

if (false === EXTRACT_JOB_TYPES.includes(extractJobType)) {
    const err = new Error("Invalid extractJobType");
    err.statusCode = 400;
    throw err;
}

We also want to check / sanitize the streamId to avoid SQL injection attacks. @kirkrodrigues do you think we should check for the input parameter's length and limit the characters set? Or maybe checking it against a RegExp is efficient.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I was wrong about the SQL injection vulnerability. The mysqlConnectionPool.query method should have sanitize the the input to some extend. Let's just check if the streamId is a string here.

side question @haiqi96 , does the scheduler hang when the streamId is an invalid string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scheduler shall return a failed job, saying the streamID doesn't resolve to any archive/file. So it should be fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like my input is no longer needed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we are good now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for numeric conversion

The Number(logEventIdx) conversion should be validated to ensure it doesn't result in NaN or negative values.

-        const sanitizedLogEventIdx = Number(logEventIdx);
+        const sanitizedLogEventIdx = Number(logEventIdx);
+        if (Number.isNaN(sanitizedLogEventIdx) || sanitizedLogEventIdx < 0) {
+            resp.code(StatusCodes.BAD_REQUEST);
+            throw new Error("logEventIdx must be a non-negative number");
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const {extractJobType, logEventIdx, streamId} = req.body;
const sanitizedLogEventIdx = Number(logEventIdx);
const {extractJobType, logEventIdx, streamId} = req.body;
const sanitizedLogEventIdx = Number(logEventIdx);
if (Number.isNaN(sanitizedLogEventIdx) || sanitizedLogEventIdx < 0) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("logEventIdx must be a non-negative number");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for logEventIdx parameter

The logEventIdx parameter should be validated before conversion to ensure it's defined.

+        if (undefined === logEventIdx) {
+            resp.code(StatusCodes.BAD_REQUEST);
+            throw new Error("\"logEventIdx\" must be provided.");
+        }
         const sanitizedLogEventIdx = Number(logEventIdx);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const sanitizedLogEventIdx = Number(logEventIdx);
if (undefined === logEventIdx) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("\"logEventIdx\" must be provided.");
}
const sanitizedLogEventIdx = Number(logEventIdx);

let irMetadata = await fastify.dbManager.getExtractedIrFileMetadata(
origFileId,
let streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata(
streamId,
sanitizedLogEventIdx
);

if (null === irMetadata) {
const extractResult = await fastify.dbManager.submitAndWaitForExtractIrJob({
file_split_id: null,
msg_ix: sanitizedLogEventIdx,
orig_file_id: origFileId,
target_uncompressed_size: EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE,
});
if (null === streamMetadata) {
await submitAndWaitForExtractStreamJob(
fastify,
extractJobType,
streamId,
sanitizedLogEventIdx
);
streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so sorry for sharing a not optimal refactoring technique previously. Instead of creating a submitAndWaitForExtractStreamJob helper in this file, can we move the job config construction logics into the DbManager's submitAndWaitForExtractStreamJob method?

Suggested change
await submitAndWaitForExtractStreamJob(
fastify,
extractJobType,
streamId,
sanitizedLogEventIdx
);
const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob({
jobType: extractJobType,
logEventIdx: sanitizedLogEventIdx,
streamId: streamId,
targetUncompressedSize: settings.StreamTargetUncompressedSize,
});
if (null === extractResult) {
const err = new Error("Unable to extract stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);
err.statusCode = 400;
throw err;
}

streamId,
sanitizedLogEventIdx
);

if (null === extractResult) {
const err = new Error("Unable to extract IR for file with " +
`origFileId=${origFileId} at logEventIdx=${sanitizedLogEventIdx}`);
if (null === streamMetadata) {
const err = new Error("Unable to find the metadata of extracted stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);

err.statusCode = 400;
throw err;
}
irMetadata = await fastify.dbManager.getExtractedIrFileMetadata(
origFileId,
sanitizedLogEventIdx
);
}

return irMetadata;
return streamMetadata;
});
};

Expand Down
2 changes: 1 addition & 1 deletion components/package-template/src/etc/clp-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
#stream_output:
# directory: "var/data/streams"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check if issue #608 is valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. pushed a fix

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let also mention fixes #608 in the PR description to link the issue (or we can do it in the commit message body if you see fit).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

#
# # How large each IR file should be before being split into a new IR file
# # How large each stream file should be before being split into a new stream file
# target_uncompressed_size: 134217728 # 128 MB
#
## Location where other data (besides archives) are stored. It will be created if
Expand Down
Loading
Loading