-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(webui): Support viewing search results in context for JSON logs (clp-json). #596
Changes from 46 commits
97d3e31
1d8e8e5
c00e28a
08acaa7
65bdd13
3de1532
6bd1eef
fbe5ced
8c93ba3
15c4b1d
8245200
a0d4324
0edfe49
1f1dbb9
e5a691c
29322c5
41b1023
2bcab50
b19b9a9
a7a222d
f05d122
8a93597
563f11b
f274da9
ede18a8
d2eae63
d12a39d
fdffdb9
9e41095
6a3d7c1
eb66fc1
e7cddc2
d5a4990
2618de0
2b6ff2f
a6b9fe9
be56609
bdfaf19
63725ff
ae6a601
ce1acdb
b631316
1bb72c0
d06be72
b9b8440
66e3466
99d2240
a897682
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,20 +13,31 @@ import axios from "axios"; | |||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||
* Submits a job to extract the split of an original file that contains a given log event. The file | ||||||||||||||||||||||||||||||||||||||||||||||||||
* is extracted as a CLP IR file. | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @typedef {object} ExtractJsonResp | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @property {number} begin_msg_ix | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @property {number} end_msg_ix | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @property {boolean} is_last_ir_chunk | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @property {string} orig_file_id | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @property {string} path | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @property {string} _id | ||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||
* Submits a job to extract the stream that contains a given log event. The stream is extracted | ||||||||||||||||||||||||||||||||||||||||||||||||||
* either as a CLP IR or a JSON Lines file. | ||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @param {number|string} origFileId The ID of the original file | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @param {number} logEventIdx The index of the log event | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @param {QUERY_JOB_TYPE} extractJobType | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @param {string} streamId | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @param {number} logEventIdx | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @param {Function} onUploadProgress Callback to handle upload progress events. | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @return {Promise<axios.AxiosResponse<ExtractIrResp>>} | ||||||||||||||||||||||||||||||||||||||||||||||||||
* @return {Promise<axios.AxiosResponse<ExtractIrResp|ExtractJsonResp>>} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+25
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add import for QUERY_JOB_TYPE enum The JSDoc references Add the import at the top of the file: import axios from "axios";
+import {QUERY_JOB_TYPE} from "../constants/queryTypes"; 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||
const submitExtractIrJob = async (origFileId, logEventIdx, onUploadProgress) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
const submitExtractStreamJob = async (extractJobType, streamId, logEventIdx, onUploadProgress) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return await axios.post( | ||||||||||||||||||||||||||||||||||||||||||||||||||
"/query/extract-ir", | ||||||||||||||||||||||||||||||||||||||||||||||||||
{logEventIdx, origFileId}, | ||||||||||||||||||||||||||||||||||||||||||||||||||
"/query/extract-stream", | ||||||||||||||||||||||||||||||||||||||||||||||||||
{extractJobType, streamId, logEventIdx}, | ||||||||||||||||||||||||||||||||||||||||||||||||||
{onUploadProgress} | ||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+35
to
41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add parameter validation for extractJobType The function accepts an extractJobType parameter but doesn't validate its value. Consider adding validation to ensure only valid job types are processed. const submitExtractStreamJob = async (extractJobType, streamId, logEventIdx, onUploadProgress) => {
+ const validJobTypes = [EXTRACT_IR, EXTRACT_JSON]; // Import these from a constants file
+ if (!validJobTypes.includes(extractJobType)) {
+ throw new Error(`Invalid extractJobType: ${extractJobType}`);
+ }
return await axios.post(
"/query/extract-stream",
{extractJobType, streamId, logEventIdx},
{onUploadProgress}
);
};
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it fine to rely the JsDoc type checks to guard against misuses? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @junhaoliao, JsDoc type annotations are indeed valuable for documentation and can assist with static analysis during development, especially when using tools like TypeScript. However, they don't enforce type checking at runtime in standard JavaScript. Adding explicit runtime validation for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since those APIs are not directly exposed to end users. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @junhaoliao, understood. Since these APIs are not directly exposed to end users, and their usage is controlled internally, relying on JsDoc type annotations may be sufficient in this context. Thank you for the clarification. ✏️ Learnings added
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
export {submitExtractIrJob}; | ||||||||||||||||||||||||||||||||||||||||||||||||||
export {submitExtractStreamJob}; |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -6,11 +6,37 @@ import { | |||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
import {AxiosError} from "axios"; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
import {submitExtractIrJob} from "../api/query.js"; | ||||||||||||||||||||||||||||||||||||||||||||
import {submitExtractStreamJob} from "../api/query.js"; | ||||||||||||||||||||||||||||||||||||||||||||
import {QUERY_LOADING_STATES} from "../typings/query.js"; | ||||||||||||||||||||||||||||||||||||||||||||
import Loading from "./Loading.jsx"; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
let enumQueryType; | ||||||||||||||||||||||||||||||||||||||||||||
/* eslint-disable sort-keys */ | ||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||
* Note: This enum is duplicated from server, as it is non-trivial to include server enums from the | ||||||||||||||||||||||||||||||||||||||||||||
* client. | ||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||
* Enum of job types, matching the `QueryJobType` class in | ||||||||||||||||||||||||||||||||||||||||||||
* `job_orchestration.query_scheduler.constants`. | ||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||
* @enum {number} | ||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||
const QUERY_JOB_TYPE = Object.freeze({ | ||||||||||||||||||||||||||||||||||||||||||||
SEARCH_OR_AGGREGATION: (enumQueryType = 0), | ||||||||||||||||||||||||||||||||||||||||||||
EXTRACT_IR: ++enumQueryType, | ||||||||||||||||||||||||||||||||||||||||||||
EXTRACT_JSON: ++enumQueryType, | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+26
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid assignments within object initializations for clarity. Assigning values to Apply this diff to resolve the issue: -let enumQueryType;
+let enumQueryType = 0;
/* eslint-disable sort-keys */
/**
* Note: This enum is duplicated from server, as it is non-trivial to include server enums from the
* client.
*
* Enum of job types, matching the `QueryJobType` class in
* `job_orchestration.query_scheduler.constants`.
*
* @enum {number}
*/
const QUERY_JOB_TYPE = Object.freeze({
- SEARCH_OR_AGGREGATION: (enumQueryType = 0),
+ SEARCH_OR_AGGREGATION: enumQueryType++,
EXTRACT_IR: ++enumQueryType,
EXTRACT_JSON: ++enumQueryType,
});
/* eslint-enable sort-keys */ 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Biome[error] 26-26: The assignment should not be in an expression. The use of assignments in expressions is confusing. (lint/suspicious/noAssignInExpressions) |
||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
/* eslint-enable sort-keys */ | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||
* Mapping between job type enums and stream type | ||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||
const EXTRACT_JOB_TYPE = Object.freeze({ | ||||||||||||||||||||||||||||||||||||||||||||
ir: QUERY_JOB_TYPE.EXTRACT_IR, | ||||||||||||||||||||||||||||||||||||||||||||
json: QUERY_JOB_TYPE.EXTRACT_JSON, | ||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||
* Submits queries and renders the query states. | ||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -28,20 +54,30 @@ const QueryStatus = () => { | |||||||||||||||||||||||||||||||||||||||||||
isFirstRun.current = false; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
const searchParams = new URLSearchParams(window.location.search); | ||||||||||||||||||||||||||||||||||||||||||||
const origFileId = searchParams.get("origFileId"); | ||||||||||||||||||||||||||||||||||||||||||||
const streamType = searchParams.get("type"); | ||||||||||||||||||||||||||||||||||||||||||||
const streamId = searchParams.get("streamId"); | ||||||||||||||||||||||||||||||||||||||||||||
const logEventIdx = searchParams.get("logEventIdx"); | ||||||||||||||||||||||||||||||||||||||||||||
if (null === origFileId || null === logEventIdx) { | ||||||||||||||||||||||||||||||||||||||||||||
const error = "Either `origFileId` or `logEventIdx` are missing from the URL " + | ||||||||||||||||||||||||||||||||||||||||||||
"parameters. Note that non-IR-extraction queries are not supported at the moment."; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
if (null === streamType || null === streamId || null === logEventIdx) { | ||||||||||||||||||||||||||||||||||||||||||||
const error = "Queries parameters are missing from the URL parameters."; | ||||||||||||||||||||||||||||||||||||||||||||
console.error(error); | ||||||||||||||||||||||||||||||||||||||||||||
setErrorMsg(error); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
const extractJobType = EXTRACT_JOB_TYPE[streamType]; | ||||||||||||||||||||||||||||||||||||||||||||
if ("undefined" === typeof extractJobType) { | ||||||||||||||||||||||||||||||||||||||||||||
const error = `Unsupported Stream type: ${streamType}`; | ||||||||||||||||||||||||||||||||||||||||||||
console.error(error); | ||||||||||||||||||||||||||||||||||||||||||||
setErrorMsg(error); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
submitExtractIrJob( | ||||||||||||||||||||||||||||||||||||||||||||
origFileId, | ||||||||||||||||||||||||||||||||||||||||||||
submitExtractStreamJob( | ||||||||||||||||||||||||||||||||||||||||||||
extractJobType, | ||||||||||||||||||||||||||||||||||||||||||||
streamId, | ||||||||||||||||||||||||||||||||||||||||||||
Number(logEventIdx), | ||||||||||||||||||||||||||||||||||||||||||||
() => { | ||||||||||||||||||||||||||||||||||||||||||||
setQueryState(QUERY_LOADING_STATES.WAITING); | ||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -62,9 +62,18 @@ let enumQueryType; | |||||||||||||||||||||||||||||||||||
const QUERY_JOB_TYPE = Object.freeze({ | ||||||||||||||||||||||||||||||||||||
SEARCH_OR_AGGREGATION: (enumQueryType = 0), | ||||||||||||||||||||||||||||||||||||
EXTRACT_IR: ++enumQueryType, | ||||||||||||||||||||||||||||||||||||
EXTRACT_JSON: ++enumQueryType, | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
/* eslint-enable sort-keys */ | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||
* List of valid extract job types. | ||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||
const EXTRACT_JOB_TYPES = Object.freeze([ | ||||||||||||||||||||||||||||||||||||
QUERY_JOB_TYPE.EXTRACT_IR, | ||||||||||||||||||||||||||||||||||||
QUERY_JOB_TYPE.EXTRACT_JSON, | ||||||||||||||||||||||||||||||||||||
]); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||
* Class to manage connections to the jobs database (MySQL) and results cache (MongoDB). | ||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||
|
@@ -101,20 +110,43 @@ class DbManager { | |||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||
* Submits an IR extraction job to the scheduler and waits for it to finish. | ||||||||||||||||||||||||||||||||||||
* Submits a stream extraction job to the scheduler and waits for it to finish. | ||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||
* @param {object} jobConfig | ||||||||||||||||||||||||||||||||||||
* @param {number} jobType | ||||||||||||||||||||||||||||||||||||
* @param {number} logEventIdx | ||||||||||||||||||||||||||||||||||||
* @param {string} streamId | ||||||||||||||||||||||||||||||||||||
* @param {number} targetUncompressedSize | ||||||||||||||||||||||||||||||||||||
* @return {Promise<number|null>} The ID of the job or null if an error occurred. | ||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||
async submitAndWaitForExtractIrJob (jobConfig) { | ||||||||||||||||||||||||||||||||||||
async submitAndWaitForExtractStreamJob ( | ||||||||||||||||||||||||||||||||||||
jobType, | ||||||||||||||||||||||||||||||||||||
logEventIdx, | ||||||||||||||||||||||||||||||||||||
streamId, | ||||||||||||||||||||||||||||||||||||
targetUncompressedSize | ||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When there are multiple parameters to a single function, it's be better to use the object destructuring syntax:
Suggested change
The JsDoc can be updated as:
Then the method call can be modified as:
which looks more readable. What do you think? |
||||||||||||||||||||||||||||||||||||
let jobConfig; | ||||||||||||||||||||||||||||||||||||
if (QUERY_JOB_TYPE.EXTRACT_IR === jobType) { | ||||||||||||||||||||||||||||||||||||
jobConfig = { | ||||||||||||||||||||||||||||||||||||
file_split_id: null, | ||||||||||||||||||||||||||||||||||||
msg_ix: logEventIdx, | ||||||||||||||||||||||||||||||||||||
orig_file_id: streamId, | ||||||||||||||||||||||||||||||||||||
target_uncompressed_size: targetUncompressedSize, | ||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||
} else if (QUERY_JOB_TYPE.EXTRACT_JSON === jobType) { | ||||||||||||||||||||||||||||||||||||
jobConfig = { | ||||||||||||||||||||||||||||||||||||
archive_id: streamId, | ||||||||||||||||||||||||||||||||||||
target_chunk_size: targetUncompressedSize, | ||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
let jobId; | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||
const [result] = await this.#mysqlConnectionPool.query( | ||||||||||||||||||||||||||||||||||||
`INSERT INTO ${this.#queryJobsTableName} (job_config, type) | ||||||||||||||||||||||||||||||||||||
VALUES (?, ?)`, | ||||||||||||||||||||||||||||||||||||
[ | ||||||||||||||||||||||||||||||||||||
Buffer.from(msgpackEncode(jobConfig)), | ||||||||||||||||||||||||||||||||||||
QUERY_JOB_TYPE.EXTRACT_IR, | ||||||||||||||||||||||||||||||||||||
jobType, | ||||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
@@ -130,16 +162,16 @@ class DbManager { | |||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||
* Gets the metadata for an IR file extracted from part of an original file, where the original | ||||||||||||||||||||||||||||||||||||
* file has the given ID and the extracted part contains the given log event index. | ||||||||||||||||||||||||||||||||||||
* Gets the metadata for the extracted stream that has the given streamId and contains the | ||||||||||||||||||||||||||||||||||||
* given logEventIdx. | ||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||
* @param {string} origFileId | ||||||||||||||||||||||||||||||||||||
* @param {string} streamId | ||||||||||||||||||||||||||||||||||||
* @param {number} logEventIdx | ||||||||||||||||||||||||||||||||||||
* @return {Promise<object>} A promise that resolves to the extracted IR file's metadata. | ||||||||||||||||||||||||||||||||||||
* @return {Promise<object>} A promise that resolves to the extracted stream's metadata. | ||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||
async getExtractedIrFileMetadata (origFileId, logEventIdx) { | ||||||||||||||||||||||||||||||||||||
async getExtractedStreamFileMetadata (streamId, logEventIdx) { | ||||||||||||||||||||||||||||||||||||
return await this.#streamFilesCollection.findOne({ | ||||||||||||||||||||||||||||||||||||
orig_file_id: origFileId, | ||||||||||||||||||||||||||||||||||||
orig_file_id: streamId, | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't read the MongoDB insertion code, but it's a little odd that the key name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Also, semantically are there any differences between a stream ID and an original file ID? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part is tricky. "stream" is a concept I started using a few weeks ago in order to commonize IR and JSON. an IR stream is identified by it's "orig_file_id", and a JSON Line file is identified by "archive_id". When the CLP code was written, we used "orig_file_id" for IR. Later when we wrote code for JSON, somehow we kept using "orig_file_id" to represent archive_id in the metadata. (I was not involved in the design process). For now, the easiest way is to just use orig_file_id. After the release, we can either let JSON and IR use different field, or let them both use a single field "stream_id". I personally prefer the latter. @kirkrodrigues any opinion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, we can switch it to stream_id. |
||||||||||||||||||||||||||||||||||||
begin_msg_ix: {$lte: logEventIdx}, | ||||||||||||||||||||||||||||||||||||
end_msg_ix: {$gt: logEventIdx}, | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
@@ -236,6 +268,8 @@ class DbManager { | |||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
export {EXTRACT_JOB_TYPES}; | ||||||||||||||||||||||||||||||||||||
export {QUERY_JOB_TYPE}; | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||
export default fastifyPlugin(async (app, options) => { | ||||||||||||||||||||||||||||||||||||
await app.decorate("dbManager", new DbManager(app, options)); | ||||||||||||||||||||||||||||||||||||
}); |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,8 @@ | ||||||||||||||||||||||||||||||||||
// eslint-disable-next-line no-magic-numbers | ||||||||||||||||||||||||||||||||||
const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024; | ||||||||||||||||||||||||||||||||||
import {StatusCodes} from "http-status-codes"; | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
import settings from "../../settings.json" with {type: "json"}; | ||||||||||||||||||||||||||||||||||
import {EXTRACT_JOB_TYPES} from "../DbManager.js"; | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||
* Creates query routes. | ||||||||||||||||||||||||||||||||||
|
@@ -9,37 +12,52 @@ const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024; | |||||||||||||||||||||||||||||||||
* @return {Promise<void>} | ||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||
const routes = async (fastify, options) => { | ||||||||||||||||||||||||||||||||||
fastify.post("/query/extract-ir", async (req, resp) => { | ||||||||||||||||||||||||||||||||||
const {origFileId, logEventIdx} = req.body; | ||||||||||||||||||||||||||||||||||
fastify.post("/query/extract-stream", async (req, resp) => { | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure backward compatibility when changing endpoint URL Changing the endpoint from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's ok, we will update all client as well There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @haiqi96 Understood. Let me know if there's anything else I can assist with. (_/) |
||||||||||||||||||||||||||||||||||
const {extractJobType, logEventIdx, streamId} = req.body; | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move the parameter validation checks directly below this line. |
||||||||||||||||||||||||||||||||||
const sanitizedLogEventIdx = Number(logEventIdx); | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add input validation for request parameters. While converting logEventIdx to a number, we should add proper validation for all input parameters to ensure robustness. Apply this diff to add comprehensive validation: - const {extractJobType, logEventIdx, streamId} = req.body;
- const sanitizedLogEventIdx = Number(logEventIdx);
+ const {extractJobType, logEventIdx, streamId} = req.body;
+
+ if (!streamId || typeof streamId !== 'string') {
+ const err = new Error('Invalid streamId');
+ err.statusCode = 400;
+ throw err;
+ }
+
+ if (!extractJobType || !Object.values(QUERY_JOB_TYPE).includes(extractJobType)) {
+ const err = new Error(`Invalid extractJobType: ${extractJobType}`);
+ err.statusCode = 400;
+ throw err;
+ }
+
+ const sanitizedLogEventIdx = Number(logEventIdx);
+ if (isNaN(sanitizedLogEventIdx) || sanitizedLogEventIdx < 0) {
+ const err = new Error('Invalid logEventIdx');
+ err.statusCode = 400;
+ throw err;
+ }
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @junhaoliao you think it would be nice to add those checks? It might hit 20 statement limit though There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, for
and add a check for
We also want to check / sanitize the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I was wrong about the SQL injection vulnerability. The side question @haiqi96 , does the scheduler hang when the streamId is an invalid string? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the scheduler shall return a failed job, saying the streamID doesn't resolve to any archive/file. So it should be fine There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like my input is no longer needed, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, we are good now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add validation for numeric conversion The - const sanitizedLogEventIdx = Number(logEventIdx);
+ const sanitizedLogEventIdx = Number(logEventIdx);
+ if (Number.isNaN(sanitizedLogEventIdx) || sanitizedLogEventIdx < 0) {
+ resp.code(StatusCodes.BAD_REQUEST);
+ throw new Error("logEventIdx must be a non-negative number");
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
let irMetadata = await fastify.dbManager.getExtractedIrFileMetadata( | ||||||||||||||||||||||||||||||||||
origFileId, | ||||||||||||||||||||||||||||||||||
if (false === EXTRACT_JOB_TYPES.includes(extractJobType)) { | ||||||||||||||||||||||||||||||||||
resp.code(StatusCodes.BAD_REQUEST); | ||||||||||||||||||||||||||||||||||
throw new Error("Invalid extractJobType"); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if (null === streamId) { | ||||||||||||||||||||||||||||||||||
resp.code(StatusCodes.BAD_REQUEST); | ||||||||||||||||||||||||||||||||||
throw new Error("streamId must not be null"); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance streamId validation The current validation only checks for null. Consider adding checks for undefined, empty string, and string type. - if (null === streamId) {
+ if (!streamId || typeof streamId !== 'string' || streamId.trim() === '') {
resp.code(StatusCodes.BAD_REQUEST);
- throw new Error("streamId must not be null");
+ throw new Error("streamId must be a non-empty string");
} 📝 Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel what rabbit makes sense. Shall I just commit suggestions? @junhaoliao There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Except that we do not need to check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
let streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata( | ||||||||||||||||||||||||||||||||||
streamId, | ||||||||||||||||||||||||||||||||||
sanitizedLogEventIdx | ||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if (null === irMetadata) { | ||||||||||||||||||||||||||||||||||
const extractResult = await fastify.dbManager.submitAndWaitForExtractIrJob({ | ||||||||||||||||||||||||||||||||||
file_split_id: null, | ||||||||||||||||||||||||||||||||||
msg_ix: sanitizedLogEventIdx, | ||||||||||||||||||||||||||||||||||
orig_file_id: origFileId, | ||||||||||||||||||||||||||||||||||
target_uncompressed_size: EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE, | ||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||
if (null === streamMetadata) { | ||||||||||||||||||||||||||||||||||
const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob( | ||||||||||||||||||||||||||||||||||
extractJobType, | ||||||||||||||||||||||||||||||||||
sanitizedLogEventIdx, | ||||||||||||||||||||||||||||||||||
streamId, | ||||||||||||||||||||||||||||||||||
settings.StreamTargetUncompressedSize, | ||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if (null === extractResult) { | ||||||||||||||||||||||||||||||||||
const err = new Error("Unable to extract IR for file with " + | ||||||||||||||||||||||||||||||||||
`origFileId=${origFileId} at logEventIdx=${sanitizedLogEventIdx}`); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
err.statusCode = 400; | ||||||||||||||||||||||||||||||||||
throw err; | ||||||||||||||||||||||||||||||||||
resp.code(StatusCodes.BAD_REQUEST); | ||||||||||||||||||||||||||||||||||
throw new Error("Unable to extract stream with " + | ||||||||||||||||||||||||||||||||||
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
irMetadata = await fastify.dbManager.getExtractedIrFileMetadata( | ||||||||||||||||||||||||||||||||||
origFileId, | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata( | ||||||||||||||||||||||||||||||||||
streamId, | ||||||||||||||||||||||||||||||||||
sanitizedLogEventIdx | ||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if (null === streamMetadata) { | ||||||||||||||||||||||||||||||||||
resp.code(StatusCodes.BAD_REQUEST); | ||||||||||||||||||||||||||||||||||
throw new Error("Unable to find the metadata of extracted stream with " + | ||||||||||||||||||||||||||||||||||
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
return irMetadata; | ||||||||||||||||||||||||||||||||||
return streamMetadata; | ||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,7 +86,7 @@ | |
#stream_output: | ||
# directory: "var/data/streams" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we check if issue #608 is valid? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. pushed a fix There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let also mention There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
# | ||
# # How large each IR file should be before being split into a new IR file | ||
# # How large each stream file should be before being split into a new stream file | ||
# target_uncompressed_size: 134217728 # 128 MB | ||
# | ||
## Location where other data (besides archives) are stored. It will be created if | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update ExtractJsonResp type definition to match actual JSON response structure
The current type definition appears to be copied from ExtractIrResp without considering JSON-specific differences. Based on the PR objectives:
is_last_ir_chunk
orig_file_id
archive_id
Apply this diff to fix the type definition:
📝 Committable suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rabbit understood it wrong. With that said, similar to the query job type, shall we document the source definition (i.e. the mongo db insertion code?) of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junhaoliao, thank you for the clarification. It seems the current
ExtractJsonResp
type definition is accurate.Documenting the source definition, such as the MongoDB insertion code, would be beneficial for future reference. You could add a comment above the typedef to indicate where this response structure is defined, for example:
// Response structure defined in server/src/routes/query.js during MongoDB insertion
This would help maintain consistency and clarity between the client and server code.
✏️ Learnings added