From 84f736643c13dd7335c8475645683715169e5ead Mon Sep 17 00:00:00 2001 From: haiqi96 <14502009+haiqi96@users.noreply.github.com> Date: Thu, 21 Nov 2024 18:05:06 -0500 Subject: [PATCH] feat(webui): Support viewing search results in context for JSON logs (clp-json). (#596) Co-authored-by: Junhao Liao --- .../clp_package_utils/scripts/start_clp.py | 1 + .../clp-py-utils/clp_py_utils/clp_config.py | 2 +- .../log-viewer-webui/client/src/api/query.js | 29 ++++++--- .../client/src/ui/QueryStatus.jsx | 50 +++++++++++++--- .../log-viewer-webui/server/settings.json | 1 + .../log-viewer-webui/server/src/DbManager.js | 56 ++++++++++++++---- .../server/src/routes/query.js | 59 ++++++++++++------- .../package-template/src/etc/clp-config.yml | 2 +- .../SearchResultsTable/index.jsx | 55 +++++++++++------ 9 files changed, 187 insertions(+), 68 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index a25756fd9..6732ded0b 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -936,6 +936,7 @@ def start_log_viewer_webui( "MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name, "ClientDir": str(container_log_viewer_webui_dir / "client"), "StreamFilesDir": str(container_clp_config.stream_output.directory), + "StreamTargetUncompressedSize": container_clp_config.stream_output.target_uncompressed_size, "LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"), } settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates) diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 4bb00bd9e..79a94505d 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -346,7 +346,7 @@ def dump_to_primitive_dict(self): class StreamOutput(BaseModel): - directory: pathlib.Path = pathlib.Path("var") / "data" / "stream" + directory: pathlib.Path = pathlib.Path("var") / "data" / "streams" target_uncompressed_size: int = 128 * 1024 * 1024 @validator("directory") diff --git a/components/log-viewer-webui/client/src/api/query.js b/components/log-viewer-webui/client/src/api/query.js index be17d0fc8..eda1db21c 100644 --- a/components/log-viewer-webui/client/src/api/query.js +++ b/components/log-viewer-webui/client/src/api/query.js @@ -13,20 +13,31 @@ import axios from "axios"; */ /** - * Submits a job to extract the split of an original file that contains a given log event. The file - * is extracted as a CLP IR file. + * @typedef {object} ExtractJsonResp + * @property {number} begin_msg_ix + * @property {number} end_msg_ix + * @property {boolean} is_last_ir_chunk + * @property {string} orig_file_id + * @property {string} path + * @property {string} _id + */ + +/** + * Submits a job to extract the stream that contains a given log event. The stream is extracted + * either as a CLP IR or a JSON Lines file. * - * @param {number|string} origFileId The ID of the original file - * @param {number} logEventIdx The index of the log event + * @param {QUERY_JOB_TYPE} extractJobType + * @param {string} streamId + * @param {number} logEventIdx * @param {Function} onUploadProgress Callback to handle upload progress events. - * @return {Promise>} + * @return {Promise>} */ -const submitExtractIrJob = async (origFileId, logEventIdx, onUploadProgress) => { +const submitExtractStreamJob = async (extractJobType, streamId, logEventIdx, onUploadProgress) => { return await axios.post( - "/query/extract-ir", - {logEventIdx, origFileId}, + "/query/extract-stream", + {extractJobType, streamId, logEventIdx}, {onUploadProgress} ); }; -export {submitExtractIrJob}; +export {submitExtractStreamJob}; diff --git a/components/log-viewer-webui/client/src/ui/QueryStatus.jsx b/components/log-viewer-webui/client/src/ui/QueryStatus.jsx index c1bb639a6..56a995980 100644 --- a/components/log-viewer-webui/client/src/ui/QueryStatus.jsx +++ b/components/log-viewer-webui/client/src/ui/QueryStatus.jsx @@ -6,11 +6,37 @@ import { import {AxiosError} from "axios"; -import {submitExtractIrJob} from "../api/query.js"; +import {submitExtractStreamJob} from "../api/query.js"; import {QUERY_LOADING_STATES} from "../typings/query.js"; import Loading from "./Loading.jsx"; +let enumQueryType; +/* eslint-disable sort-keys */ +/** + * Note: This enum is duplicated from server, as it is non-trivial to include server enums from the + * client. + * + * Enum of job types, matching the `QueryJobType` class in + * `job_orchestration.query_scheduler.constants`. + * + * @enum {number} + */ +const QUERY_JOB_TYPE = Object.freeze({ + SEARCH_OR_AGGREGATION: (enumQueryType = 0), + EXTRACT_IR: ++enumQueryType, + EXTRACT_JSON: ++enumQueryType, +}); +/* eslint-enable sort-keys */ + +/** + * Mapping between job type enums and stream type + */ +const EXTRACT_JOB_TYPE = Object.freeze({ + ir: QUERY_JOB_TYPE.EXTRACT_IR, + json: QUERY_JOB_TYPE.EXTRACT_JSON, +}); + /** * Submits queries and renders the query states. * @@ -28,20 +54,30 @@ const QueryStatus = () => { isFirstRun.current = false; const searchParams = new URLSearchParams(window.location.search); - const origFileId = searchParams.get("origFileId"); + const streamType = searchParams.get("type"); + const streamId = searchParams.get("streamId"); const logEventIdx = searchParams.get("logEventIdx"); - if (null === origFileId || null === logEventIdx) { - const error = "Either `origFileId` or `logEventIdx` are missing from the URL " + - "parameters. Note that non-IR-extraction queries are not supported at the moment."; + if (null === streamType || null === streamId || null === logEventIdx) { + const error = "Queries parameters are missing from the URL parameters."; + console.error(error); + setErrorMsg(error); + + return; + } + + const extractJobType = EXTRACT_JOB_TYPE[streamType]; + if ("undefined" === typeof extractJobType) { + const error = `Unsupported Stream type: ${streamType}`; console.error(error); setErrorMsg(error); return; } - submitExtractIrJob( - origFileId, + submitExtractStreamJob( + extractJobType, + streamId, Number(logEventIdx), () => { setQueryState(QUERY_LOADING_STATES.WAITING); diff --git a/components/log-viewer-webui/server/settings.json b/components/log-viewer-webui/server/settings.json index 163f9a9e2..9e749e144 100644 --- a/components/log-viewer-webui/server/settings.json +++ b/components/log-viewer-webui/server/settings.json @@ -10,5 +10,6 @@ "ClientDir": "../client/dist", "StreamFilesDir": "../../../build/clp-package/var/data/streams", + "StreamTargetUncompressedSize": 134217728, "LogViewerDir": "../yscope-log-viewer/dist" } diff --git a/components/log-viewer-webui/server/src/DbManager.js b/components/log-viewer-webui/server/src/DbManager.js index 79e097280..e1ec00812 100644 --- a/components/log-viewer-webui/server/src/DbManager.js +++ b/components/log-viewer-webui/server/src/DbManager.js @@ -62,9 +62,18 @@ let enumQueryType; const QUERY_JOB_TYPE = Object.freeze({ SEARCH_OR_AGGREGATION: (enumQueryType = 0), EXTRACT_IR: ++enumQueryType, + EXTRACT_JSON: ++enumQueryType, }); /* eslint-enable sort-keys */ +/** + * List of valid extract job types. + */ +const EXTRACT_JOB_TYPES = Object.freeze([ + QUERY_JOB_TYPE.EXTRACT_IR, + QUERY_JOB_TYPE.EXTRACT_JSON, +]); + /** * Class to manage connections to the jobs database (MySQL) and results cache (MongoDB). */ @@ -101,12 +110,35 @@ class DbManager { } /** - * Submits an IR extraction job to the scheduler and waits for it to finish. + * Submits a stream extraction job to the scheduler and waits for it to finish. * - * @param {object} jobConfig + * @param {number} jobType + * @param {number} logEventIdx + * @param {string} streamId + * @param {number} targetUncompressedSize * @return {Promise} The ID of the job or null if an error occurred. */ - async submitAndWaitForExtractIrJob (jobConfig) { + async submitAndWaitForExtractStreamJob ({ + jobType, + logEventIdx, + streamId, + targetUncompressedSize, + }) { + let jobConfig; + if (QUERY_JOB_TYPE.EXTRACT_IR === jobType) { + jobConfig = { + file_split_id: null, + msg_ix: logEventIdx, + orig_file_id: streamId, + target_uncompressed_size: targetUncompressedSize, + }; + } else if (QUERY_JOB_TYPE.EXTRACT_JSON === jobType) { + jobConfig = { + archive_id: streamId, + target_chunk_size: targetUncompressedSize, + }; + } + let jobId; try { const [result] = await this.#mysqlConnectionPool.query( @@ -114,7 +146,7 @@ class DbManager { VALUES (?, ?)`, [ Buffer.from(msgpackEncode(jobConfig)), - QUERY_JOB_TYPE.EXTRACT_IR, + jobType, ] ); @@ -130,16 +162,16 @@ class DbManager { } /** - * Gets the metadata for an IR file extracted from part of an original file, where the original - * file has the given ID and the extracted part contains the given log event index. + * Gets the metadata for the extracted stream that has the given streamId and contains the + * given logEventIdx. * - * @param {string} origFileId + * @param {string} streamId * @param {number} logEventIdx - * @return {Promise} A promise that resolves to the extracted IR file's metadata. + * @return {Promise} A promise that resolves to the extracted stream's metadata. */ - async getExtractedIrFileMetadata (origFileId, logEventIdx) { + async getExtractedStreamFileMetadata (streamId, logEventIdx) { return await this.#streamFilesCollection.findOne({ - orig_file_id: origFileId, + orig_file_id: streamId, begin_msg_ix: {$lte: logEventIdx}, end_msg_ix: {$gt: logEventIdx}, }); @@ -236,6 +268,10 @@ class DbManager { } } +export { + EXTRACT_JOB_TYPES, + QUERY_JOB_TYPE, +}; export default fastifyPlugin(async (app, options) => { await app.decorate("dbManager", new DbManager(app, options)); }); diff --git a/components/log-viewer-webui/server/src/routes/query.js b/components/log-viewer-webui/server/src/routes/query.js index 628bcd053..452f6480a 100644 --- a/components/log-viewer-webui/server/src/routes/query.js +++ b/components/log-viewer-webui/server/src/routes/query.js @@ -1,5 +1,8 @@ -// eslint-disable-next-line no-magic-numbers -const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024; +import {StatusCodes} from "http-status-codes"; + +import settings from "../../settings.json" with {type: "json"}; +import {EXTRACT_JOB_TYPES} from "../DbManager.js"; + /** * Creates query routes. @@ -9,37 +12,51 @@ const EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE = 128 * 1024 * 1024; * @return {Promise} */ const routes = async (fastify, options) => { - fastify.post("/query/extract-ir", async (req, resp) => { - const {origFileId, logEventIdx} = req.body; - const sanitizedLogEventIdx = Number(logEventIdx); + fastify.post("/query/extract-stream", async (req, resp) => { + const {extractJobType, logEventIdx, streamId} = req.body; + if (false === EXTRACT_JOB_TYPES.includes(extractJobType)) { + resp.code(StatusCodes.BAD_REQUEST); + throw new Error(`Invalid extractJobType="${extractJobType}".`); + } + + if ("string" !== typeof streamId || 0 === streamId.trim().length) { + resp.code(StatusCodes.BAD_REQUEST); + throw new Error("\"streamId\" must be a non-empty string."); + } - let irMetadata = await fastify.dbManager.getExtractedIrFileMetadata( - origFileId, + const sanitizedLogEventIdx = Number(logEventIdx); + let streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata( + streamId, sanitizedLogEventIdx ); - if (null === irMetadata) { - const extractResult = await fastify.dbManager.submitAndWaitForExtractIrJob({ - file_split_id: null, - msg_ix: sanitizedLogEventIdx, - orig_file_id: origFileId, - target_uncompressed_size: EXTRACT_IR_TARGET_UNCOMPRESSED_SIZE, + if (null === streamMetadata) { + const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob({ + jobType: extractJobType, + logEventIdx: sanitizedLogEventIdx, + streamId: streamId, + targetUncompressedSize: settings.StreamTargetUncompressedSize, }); if (null === extractResult) { - const err = new Error("Unable to extract IR for file with " + - `origFileId=${origFileId} at logEventIdx=${sanitizedLogEventIdx}`); - - err.statusCode = 400; - throw err; + resp.code(StatusCodes.BAD_REQUEST); + throw new Error("Unable to extract stream with " + + `streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`); } - irMetadata = await fastify.dbManager.getExtractedIrFileMetadata( - origFileId, + + streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata( + streamId, sanitizedLogEventIdx ); + + if (null === streamMetadata) { + resp.code(StatusCodes.BAD_REQUEST); + throw new Error("Unable to find the metadata of extracted stream with " + + `streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`); + } } - return irMetadata; + return streamMetadata; }); }; diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index 15747fe42..f19b93463 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -86,7 +86,7 @@ #stream_output: # directory: "var/data/streams" # -# # How large each IR file should be before being split into a new IR file +# # How large each stream file should be before being split into a new stream file # target_uncompressed_size: 134217728 # 128 MB # ## Location where other data (besides archives) are stored. It will be created if diff --git a/components/webui/imports/ui/SearchView/SearchResults/SearchResultsTable/index.jsx b/components/webui/imports/ui/SearchView/SearchResults/SearchResultsTable/index.jsx index 83b251cca..c7fa62a25 100644 --- a/components/webui/imports/ui/SearchView/SearchResults/SearchResultsTable/index.jsx +++ b/components/webui/imports/ui/SearchView/SearchResults/SearchResultsTable/index.jsx @@ -28,6 +28,25 @@ import "./SearchResultsTable.scss"; */ const SEARCH_RESULT_MESSAGE_LINE_HEIGHT = 1.5; +const IS_IR_STREAM = ("clp" === Meteor.settings.public.ClpStorageEngine); +const STREAM_TYPE = IS_IR_STREAM ? + "ir" : + "json"; + + +/** + * Gets the stream id for an extraction job from the search result. + * + * @param {object} searchResult + * @return {string} stream_id + */ +const getStreamId = (searchResult) => { + return IS_IR_STREAM ? + searchResult.orig_file_id : + searchResult.archive_id; +}; + + /** * Represents a table component to display search results. * @@ -93,10 +112,6 @@ const SearchResultsTable = ({ ); }, [maxLinesPerResult]); - // eslint-disable-next-line no-warning-comments - // TODO: remove this flag once "Extract IR" support is added for ClpStorageEngine "clp-s" - const isExtractIrSupported = ("clp" === Meteor.settings.public.ClpStorageEngine); - return (
{result.message} - {isExtractIrSupported && - } + > + {IS_IR_STREAM ? + result.orig_file_path : + "Original File"} + + ))}