Skip to content

Commit

Permalink
address code review
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Nov 21, 2024
1 parent d06be72 commit b9b8440
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 62 deletions.
35 changes: 33 additions & 2 deletions components/log-viewer-webui/server/src/DbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ const QUERY_JOB_TYPE = Object.freeze({
});
/* eslint-enable sort-keys */

/**
* List of valid extract job types.
*/
const EXTRACT_JOB_TYPES = Object.freeze([
QUERY_JOB_TYPE.EXTRACT_IR,
QUERY_JOB_TYPE.EXTRACT_JSON,
]);

/**
* Class to manage connections to the jobs database (MySQL) and results cache (MongoDB).
*/
Expand Down Expand Up @@ -104,11 +112,33 @@ class DbManager {
/**
* Submits a stream extraction job to the scheduler and waits for it to finish.
*
* @param {object} jobConfig
* @param {number} jobType
* @param {number} logEventIdx
* @param {string} streamId
* @param {number} targetUncompressedSize
* @return {Promise<number|null>} The ID of the job or null if an error occurred.
*/
async submitAndWaitForExtractStreamJob (jobConfig, jobType) {
async submitAndWaitForExtractStreamJob (
jobType,
logEventIdx,
streamId,
targetUncompressedSize
) {
let jobConfig;
if (QUERY_JOB_TYPE.EXTRACT_IR === jobType) {
jobConfig = {
file_split_id: null,
msg_ix: logEventIdx,
orig_file_id: streamId,
target_uncompressed_size: targetUncompressedSize,
};
} else if (QUERY_JOB_TYPE.EXTRACT_JSON === jobType) {
jobConfig = {
archive_id: streamId,
target_chunk_size: targetUncompressedSize,
};
}

let jobId;
try {
const [result] = await this.#mysqlConnectionPool.query(
Expand Down Expand Up @@ -238,6 +268,7 @@ class DbManager {
}
}

export {EXTRACT_JOB_TYPES};
export {QUERY_JOB_TYPE};
export default fastifyPlugin(async (app, options) => {
await app.decorate("dbManager", new DbManager(app, options));
Expand Down
87 changes: 27 additions & 60 deletions components/log-viewer-webui/server/src/routes/query.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,8 @@
import settings from "../../settings.json" with {type: "json"};
import {QUERY_JOB_TYPE} from "../DbManager.js";


/**
* Submits a stream extraction job with the given parameters and waits for it.
*
* @param {import("fastify").FastifyInstance | {dbManager: DbManager}} fastify
* @param {QUERY_JOB_TYPE} extractJobType
* @param {string} streamId
* @param {int} sanitizedLogEventIdx
* @throws {Error} if the extract stream job submission fails.
*/
const submitAndWaitForExtractStreamJob = async (
fastify,
extractJobType,
streamId,
sanitizedLogEventIdx
) => {
let jobConfig;
const streamTargetUncompressedSize = settings.StreamTargetUncompressedSize;
if (QUERY_JOB_TYPE.EXTRACT_IR === extractJobType) {
jobConfig = {
file_split_id: null,
msg_ix: sanitizedLogEventIdx,
orig_file_id: streamId,
target_uncompressed_size: streamTargetUncompressedSize,
};
} else if (QUERY_JOB_TYPE.EXTRACT_JSON === extractJobType) {
jobConfig = {
archive_id: streamId,
target_chunk_size: streamTargetUncompressedSize,
};
}

if (null === jobConfig) {
const err = new Error(`Unsupported Job type: ${extractJobType}`);
err.statusCode = 400;
throw err;
}

const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob(
jobConfig,
extractJobType
);

if (null === extractResult) {
const err = new Error("Unable to extract stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);

err.statusCode = 400;
throw err;
}
};
import {EXTRACT_JOB_TYPES} from "../DbManager.js";
import {
StatusCodes,
} from 'http-status-codes';

/**
* Creates query routes.
Expand All @@ -64,29 +15,45 @@ const routes = async (fastify, options) => {
fastify.post("/query/extract-stream", async (req, resp) => {
const {extractJobType, logEventIdx, streamId} = req.body;
const sanitizedLogEventIdx = Number(logEventIdx);

if (false === EXTRACT_JOB_TYPES.includes(extractJobType)) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("Invalid extractJobType");
}

if (null === streamId) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("streamId must not be null");
}

let streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata(
streamId,
sanitizedLogEventIdx
);

if (null === streamMetadata) {
await submitAndWaitForExtractStreamJob(
fastify,
const extractResult = await fastify.dbManager.submitAndWaitForExtractStreamJob(
extractJobType,
sanitizedLogEventIdx,
streamId,
sanitizedLogEventIdx
settings.StreamTargetUncompressedSize,
);

if (null === extractResult) {
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("Unable to extract stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);
}

streamMetadata = await fastify.dbManager.getExtractedStreamFileMetadata(
streamId,
sanitizedLogEventIdx
);

if (null === streamMetadata) {
const err = new Error("Unable to find the metadata of extracted stream with " +
resp.code(StatusCodes.BAD_REQUEST);
throw new Error("Unable to find the metadata of extracted stream with " +
`streamId=${streamId} at logEventIdx=${sanitizedLogEventIdx}`);

err.statusCode = 400;
throw err;
}
}

Expand Down

0 comments on commit b9b8440

Please sign in to comment.