Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#3666 - Queue Monitoring - Refactoring Schedulers #4032

Merged
merged 6 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class CancelApplicationAssessmentProcessor extends BaseQueue<CancelAssess
* @param job information to perform the process.
* @param processSummary process summary for logging.
* @returns processing result. */
async process(
protected async process(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

job: Job<CancelAssessmentQueueInDTO>,
processSummary: ProcessSummary,
): Promise<string> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class StartApplicationAssessmentProcessor extends BaseQueue<StartAssessme
* @param processSummary process summary for logging.
* @returns processing result.
*/
async process(
protected async process(
job: Job<StartAssessmentQueueInDTO>,
processSummary: ProcessSummary,
): Promise<string> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { InjectQueue, Processor } from "@nestjs/bull";
import { Job, Queue } from "bull";
import { BaseScheduler } from "../base-scheduler";
import { QueueNames } from "@sims/utilities";
import { QueueService } from "@sims/services/queue";
import { ApplicationService } from "../../../services";
import {
QueueProcessSummary,
QueueProcessSummaryResult,
} from "../../models/processors.models";
import { SystemUsersService } from "@sims/services/system-users";
import { ProcessSummary } from "@sims/utilities/logger";

/**
* Process applications archiving.
Expand All @@ -27,25 +24,20 @@ export class ProcessArchiveApplicationsScheduler extends BaseScheduler<void> {

/**
* Process all the applications pending to be archived.
* @param job applications archiving job.
* @param _job applications archiving job.
* @param processSummary process summary for logging.
* @returns processing result.
*/
@Process()
async processArchiveApplications(
job: Job<void>,
): Promise<QueueProcessSummaryResult> {
const summary = new QueueProcessSummary({
appLogger: this.logger,
jobLogger: job,
});
await summary.info("Processing pending applications to be archived.");
protected async process(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. the info message can be retained.

_job: Job<void>,
processSummary: ProcessSummary,
): Promise<string | string[]> {
const auditUser = this.systemUsersService.systemUser;
const archivedApplicationsCount =
await this.applicationService.archiveApplications(auditUser.id);
await summary.info(
`Total of applications archived: ${archivedApplicationsCount}`,
processSummary.info(
`Total applications archived: ${archivedApplicationsCount}.`,
);
await summary.info("Completed applications archiving process.");
return summary.getSummary();
return "All applications processed with success.";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ export class ATBCResponseIntegrationScheduler extends BaseScheduler<void> {
super(schedulerQueue, queueService);
}

/**
* To be removed once the method {@link process} is implemented.
* This method "hides" the {@link Process} decorator from the base class.
*/
async processQueue(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* When implemented in a derived class, process the queue job.
* To be implemented.
*/
protected async process(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* Process all the applied disability status requests by students.
* @param job ATBC response integration job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export abstract class BaseQueue<T> {
* @param processSummary process summary for logging.
* @returns processing result.
*/
abstract process(
protected abstract process(
job: Job<T>,
processSummary: ProcessSummary,
): Promise<string | string[]>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import { LoggerService, OnApplicationBootstrap } from "@nestjs/common";
import { OnApplicationBootstrap } from "@nestjs/common";
import { QueueService } from "@sims/services/queue";
import { QueueNames } from "@sims/utilities";
import { ConfigService } from "@sims/utilities/config";
import { InjectLogger } from "@sims/utilities/logger";
import { InjectLogger, LoggerService } from "@sims/utilities/logger";
import { CronRepeatOptions, Queue } from "bull";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

import { v4 as uuid } from "uuid";
import * as cronParser from "cron-parser";
import { BaseQueue } from "../schedulers/base-queue";

export abstract class BaseScheduler<T> implements OnApplicationBootstrap {
export abstract class BaseScheduler<T>
extends BaseQueue<T>
implements OnApplicationBootstrap
{
constructor(
protected schedulerQueue: Queue<T>,
protected queueService: QueueService,
) {}
) {
super();
}

/**
* Get queue cron configurations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
Expand Down Expand Up @@ -118,7 +118,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
Expand Down Expand Up @@ -175,15 +175,15 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Process finalized with success.",
"Pending suppliers to update found: 1.",
"Records updated: 1.",
"Attention, process finalized with success but some errors and/or warnings messages may require some attention.",
"Error(s): 0, Warning(s): 1, Info: 10",
"Error(s): 0, Warning(s): 1, Info: 11",
]);
expect(
mockedJob.containLogMessages([
Expand Down Expand Up @@ -239,15 +239,15 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Process finalized with success.",
"Pending suppliers to update found: 1.",
"Records updated: 1.",
"Attention, process finalized with success but some errors and/or warnings messages may require some attention.",
"Error(s): 0, Warning(s): 1, Info: 10",
"Error(s): 0, Warning(s): 1, Info: 11",
]);
expect(
mockedJob.containLogMessages([
Expand Down Expand Up @@ -299,9 +299,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(
mockedJob.job,
);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
Expand Down Expand Up @@ -426,7 +424,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
Expand All @@ -436,7 +434,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
]);
expect(
mockedJob.containLogMessages([
"Executing CAS supplier integration...",
"Executing CAS supplier integration.",
"Found 1 records to be updated.",
`Processing student CAS supplier ID: ${savedCASSupplier.id}.`,
`CAS evaluation result status: ${CASEvaluationStatus.ActiveSupplierFound}.`,
Expand Down Expand Up @@ -550,7 +548,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
Expand All @@ -560,7 +558,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
]);
expect(
mockedJob.containLogMessages([
"Executing CAS supplier integration...",
"Executing CAS supplier integration.",
"Found 1 records to be updated.",
`Processing student CAS supplier ID: ${savedCASSupplier.id}.`,
`CAS evaluation result status: ${CASEvaluationStatus.ActiveSupplierFound}.`,
Expand Down Expand Up @@ -663,9 +661,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
await expect(
processor.processCASSupplierInformation(mockedJob.job),
).rejects.toStrictEqual(
await expect(processor.processQueue(mockedJob.job)).rejects.toStrictEqual(
new Error(
"One or more errors were reported during the process, please see logs for details.",
),
Expand All @@ -674,7 +670,7 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
// Assert
expect(
mockedJob.containLogMessages([
"Executing CAS supplier integration...",
"Executing CAS supplier integration.",
"Found 2 records to be updated.",
`Processing student CAS supplier ID: ${savedCASSupplierToFail.id}.`,
'Unexpected error while processing supplier. "Unknown error"',
Expand Down Expand Up @@ -706,15 +702,15 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Process finalized with success.",
"Pending suppliers to update found: 1.",
"Records updated: 0.",
"Attention, process finalized with success but some errors and/or warnings messages may require some attention.",
"Error(s): 0, Warning(s): 1, Info: 12",
"Error(s): 0, Warning(s): 1, Info: 13",
]);
// Assert DB was updated.
const updateCASSupplier = await db.casSupplier.findOne({
Expand Down Expand Up @@ -774,15 +770,15 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Process finalized with success.",
"Pending suppliers to update found: 1.",
"Records updated: 0.",
"Attention, process finalized with success but some errors and/or warnings messages may require some attention.",
"Error(s): 0, Warning(s): 2, Info: 11",
"Error(s): 0, Warning(s): 2, Info: 12",
]);
// Assert DB was updated.
const updateCASSupplier = await db.casSupplier.findOne({
Expand Down Expand Up @@ -838,15 +834,15 @@ describe(describeProcessorRootTest(QueueNames.CASSupplierIntegration), () => {
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processCASSupplierInformation(mockedJob.job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Process finalized with success.",
"Pending suppliers to update found: 1.",
"Records updated: 0.",
"Attention, process finalized with success but some errors and/or warnings messages may require some attention.",
"Error(s): 0, Warning(s): 2, Info: 11",
"Error(s): 0, Warning(s): 2, Info: 12",
]);
// Assert DB was updated.
const updateCASSupplier = await db.casSupplier.findOne({
Expand Down
Loading
Loading