Skip to content

Commit

Permalink
#3666 - Queue Monitoring - Adjusted Queues Cleanup Period (#4021)
Browse files Browse the repository at this point in the history
As per the conversation on the [Teams
chat](https://teams.microsoft.com/l/message/19:c91c9c0acf5f44dcab6789f870ddf47e@thread.tacv2/1733175923577?tenantId=6fdb5200-3d0d-4a8a-b036-d3685e359adc&groupId=454b1d3b-af0f-4b44-b891-f4320c85d290&parentMessageId=1733175923577&teamName=External%3A%20AEST-SIMS&channelName=DEVS&createdTime=1733175923577),
adjusted the queues to use the out-of-box cleanup configuration.

- Configuration reference:
https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#keepjobs-options
- For simplicity, not changing DB settings in this PR, just adapting the
existing one to allow the removal of the methods. Currently, only
schedulers were executing the cleanup, with this change, all the queues
will have the completed tasks removed as per the existing DB setting.
andrewsignori-aot authored Dec 4, 2024
1 parent b71d175 commit 9c39d9e
Showing 29 changed files with 5 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -46,7 +46,6 @@ export class ProcessArchiveApplicationsScheduler extends BaseScheduler<void> {
`Total of applications archived: ${archivedApplicationsCount}`,
);
await summary.info("Completed applications archiving process.");
await this.cleanSchedulerQueueHistory();
return summary.getSummary();
}
}
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@ export class ATBCResponseIntegrationScheduler extends BaseScheduler<void> {
const processingResult =
await this.atbcIntegrationProcessingService.processAppliedDisabilityRequests();
await summary.info("Completed processing disability status.");
await this.cleanSchedulerQueueHistory();
return processingResult;
}
}
Original file line number Diff line number Diff line change
@@ -13,21 +13,6 @@ export abstract class BaseScheduler<T> implements OnApplicationBootstrap {
protected queueService: QueueService,
) {}

/**
* Clean the queue scheduler history.
*/
protected async cleanSchedulerQueueHistory(): Promise<void> {
try {
const queueCleanUpPeriod = await this.queueService.getQueueCleanUpPeriod(
this.schedulerQueue.name as QueueNames,
);
await this.schedulerQueue.clean(queueCleanUpPeriod, "completed");
} catch (error: unknown) {
// Fail silently.
this.logger.error(error);
}
}

/**
* Get queue cron configurations.
*/
Original file line number Diff line number Diff line change
@@ -75,7 +75,6 @@ export class CASSupplierIntegrationScheduler extends BaseScheduler<void> {
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -35,7 +35,6 @@ export class CRAProcessIntegrationScheduler extends BaseScheduler<void> {
this.logger.log("Executing income validation...");
const uploadResult = await this.cra.createIncomeVerificationRequest();
this.logger.log("Income validation executed.");
await this.cleanSchedulerQueueHistory();
return {
generatedFile: uploadResult.generatedFile,
uploadedRecords: uploadResult.uploadedRecords,
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@ export class CRAResponseIntegrationScheduler extends BaseScheduler<void> {
`Processing CRA integration job ${job.id} of type ${job.name}.`,
);
const results = await this.cra.processResponses();
await this.cleanSchedulerQueueHistory();
return results.map((result) => {
return {
processSummary: result.processSummary,
Original file line number Diff line number Diff line change
@@ -65,7 +65,6 @@ export class ApplicationChangesReportIntegrationScheduler extends BaseScheduler<
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -36,7 +36,6 @@ export class DisbursementReceiptsFileIntegrationScheduler extends BaseScheduler<
const auditUser = this.systemUsersService.systemUser;
const processResponse =
await this.disbursementReceiptProcessingService.process(auditUser.id);
await this.cleanSchedulerQueueHistory();
processSummary.info(
`Completed full time disbursement receipts integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -51,7 +51,6 @@ export class FullTimeECertFeedbackIntegrationScheduler extends BaseScheduler<voi
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -53,7 +53,6 @@ export class PartTimeECertFeedbackIntegrationScheduler extends BaseScheduler<voi
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -65,7 +65,6 @@ export abstract class ECertProcessIntegrationBaseScheduler extends BaseScheduler
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@ export class FederalRestrictionsIntegrationScheduler extends BaseScheduler<void>
await summary.info("Starting federal restrictions import...");
const uploadResult = await this.fedRestrictionProcessingService.process();
await summary.info("Federal restrictions import process finished.");
await this.cleanSchedulerQueueHistory();
await summary.info(
`Completed federal restriction integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ export class FullTimeMSFAAProcessIntegrationScheduler extends BaseScheduler<void
OfferingIntensity.fullTime,
);
await summary.info("MSFAA request file sent.");
await this.cleanSchedulerQueueHistory();
await summary.info(
`Completed MSFAA Full-time integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -36,7 +36,6 @@ export class FullTimeMSFAAProcessResponseIntegrationScheduler extends BaseSchedu
const results = await this.msfaaResponseService.processResponses(
OfferingIntensity.fullTime,
);
await this.cleanSchedulerQueueHistory();
await summary.info(
`Completed MSFAA Full-time integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -43,7 +43,6 @@ export class PartTimeMSFAAProcessIntegrationScheduler extends BaseScheduler<void
OfferingIntensity.partTime,
);
await summary.info("MSFAA request file sent.");
await this.cleanSchedulerQueueHistory();
await summary.info(
`Completed MSFAA Part-time integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -36,7 +36,6 @@ export class PartTimeMSFAAProcessResponseIntegrationScheduler extends BaseSchedu
const results = await this.msfaaResponseService.processResponses(
OfferingIntensity.partTime,
);
await this.cleanSchedulerQueueHistory();
await summary.info(
`Completed MSFAA Part-time integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@ export class SINValidationProcessIntegrationScheduler extends BaseScheduler<void
const uploadResult =
await this.sinValidationProcessingService.uploadSINValidationRequests();
await summary.info("ESDC SIN validation request file sent.");
await this.cleanSchedulerQueueHistory();
await summary.info(
`Completed SIN validation integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -42,7 +42,6 @@ export class SINValidationResponseIntegrationScheduler extends BaseScheduler<voi
auditUser.id,
);
await summary.info("ESDC SIN validation response files processed.");
await this.cleanSchedulerQueueHistory();
await summary.info(
`Completed SIN validation integration job ${job.id} of type ${job.name}.`,
);
Original file line number Diff line number Diff line change
@@ -60,7 +60,6 @@ export class StudentLoanBalancesPartTimeIntegrationScheduler extends BaseSchedul
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -34,7 +34,6 @@ export class ECEProcessIntegrationScheduler extends BaseScheduler<void> {
this.logger.log("Executing ECE request file generation ...");
const uploadResults = await this.eceFileService.processECEFile();
this.logger.log("ECE request file generation completed.");
await this.cleanSchedulerQueueHistory();

return uploadResults.map(
(uploadResult) =>
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@ export class ECEResponseIntegrationScheduler extends BaseScheduler<void> {
this.logger.log("Processing ECE response files ...");
const processingResult = await this.eceResponseProcessingService.process();
this.logger.log("Processing ECE response files completed.");
await this.cleanSchedulerQueueHistory();
return processingResult;
}

Original file line number Diff line number Diff line change
@@ -38,7 +38,6 @@ export class IER12IntegrationScheduler extends BaseScheduler<GeneratedDateQueueI
job.data.generatedDate,
);
this.logger.log("IER 12 file generation completed.");
await this.cleanSchedulerQueueHistory();

return uploadResults.map(
(uploadResult) =>
Original file line number Diff line number Diff line change
@@ -47,7 +47,6 @@ export class ProcessNotificationScheduler extends BaseScheduler<ProcessNotificat
`Total notifications processed ${processNotificationResponse.notificationsProcessed}`,
`Total notifications successfully processed ${processNotificationResponse.notificationsSuccessfullyProcessed}`,
];
await this.cleanSchedulerQueueHistory();
return { summary: processSummaryResult } as QueueProcessSummaryResult;
}
}
Original file line number Diff line number Diff line change
@@ -39,7 +39,6 @@ export class SFASIntegrationScheduler extends BaseScheduler<void> {
const processingResults =
await this.sfasIntegrationProcessingService.process();
await summary.info("Completed processing SFAS integration files.");
await this.cleanSchedulerQueueHistory();
return processingResults.map((result) => ({
summary: result.summary,
success: result.success,
Original file line number Diff line number Diff line change
@@ -85,7 +85,6 @@ export class SIMSToSFASIntegrationScheduler extends BaseScheduler<void> {
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -81,7 +81,6 @@ export class StudentApplicationNotificationsScheduler extends BaseScheduler<void
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -66,7 +66,6 @@ export class AssessmentWorkflowEnqueuerScheduler extends BaseScheduler<void> {
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

Original file line number Diff line number Diff line change
@@ -68,7 +68,6 @@ export class WorkflowQueueRetryScheduler extends BaseScheduler<AssessmentWorkflo
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
await this.cleanSchedulerQueueHistory();
}
}

17 changes: 5 additions & 12 deletions sources/packages/backend/libs/services/src/queue/queue.service.ts
Original file line number Diff line number Diff line change
@@ -78,6 +78,11 @@ export class QueueService {
cron: queueConfig.queueConfiguration.cron,
};
}
if (queueConfig.queueConfiguration.cleanUpPeriod) {
config.removeOnComplete = {
age: queueConfig.queueConfiguration.cleanUpPeriod / 1000,
};
}
return config;
}

@@ -91,18 +96,6 @@ export class QueueService {
return queueConfig.queueSetting;
}

/**
* Get queue clean up period.
* @param queueName queue name
* @returns queue clean up period.
*/
async getQueueCleanUpPeriod(
queueName: QueueNames,
): Promise<number | undefined> {
const queueConfig = await this.queueConfigurationDetails(queueName);
return queueConfig.queueConfiguration.cleanUpPeriod;
}

/**
* Get queue polling record limit.
* @param queueName queue name

0 comments on commit 9c39d9e

Please sign in to comment.