Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2181 - Queued Assessments - Assessment Workflow Enqueuer Scheduler - Cancel Assessment #2375

Merged
merged 17 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
ApplicationData,
getUserFullNameLikeSearch,
transformToApplicationEntitySortField,
StudentAssessmentStatus,
} from "@sims/sims-db";
import { StudentFileService } from "../student-file/student-file.service";
import {
Expand All @@ -33,7 +34,7 @@ import {
PaginatedResults,
offeringBelongToProgramYear,
} from "../../utilities";
import { CustomNamedError, QueueNames } from "@sims/utilities";
import { CustomNamedError } from "@sims/utilities";
import {
SFASApplicationService,
SFASPartTimeApplicationsService,
Expand All @@ -52,9 +53,6 @@ import {
import { SequenceControlService } from "@sims/services";
import { ConfigService } from "@sims/utilities/config";
import { NotificationActionsService } from "@sims/services/notifications";
import { InjectQueue } from "@nestjs/bull";
import { Queue } from "bull";
import { CancelAssessmentQueueInDTO } from "@sims/services/queue";
import { InstitutionLocationService } from "../institution-location/institution-location.service";

export const APPLICATION_DRAFT_NOT_FOUND = "APPLICATION_DRAFT_NOT_FOUND";
Expand Down Expand Up @@ -82,8 +80,6 @@ export class ApplicationService extends RecordDataModelService<Application> {
private readonly offeringService: EducationProgramOfferingService,
private readonly notificationActionsService: NotificationActionsService,
private readonly institutionLocationService: InstitutionLocationService,
@InjectQueue(QueueNames.CancelApplicationAssessment)
private readonly cancelAssessmentQueue: Queue<CancelAssessmentQueueInDTO>,
) {
super(dataSource.getRepository(Application));
}
Expand Down Expand Up @@ -271,14 +267,16 @@ export class ApplicationService extends RecordDataModelService<Application> {
newApplication.creator = auditUser;
newApplication.studentAssessments = [originalAssessment];
newApplication.currentAssessment = originalAssessment;

// Updates the current assessment status to cancellation requested.
application.currentAssessment.studentAssessmentStatus =
StudentAssessmentStatus.CancellationRequested;
application.currentAssessment.modifier = auditUser;
application.currentAssessment.studentAssessmentStatusUpdatedOn = now;
application.currentAssessment.updatedAt = now;

await applicationRepository.save([application, newApplication]);
});
// Deleting the existing workflow, if there is one.
if (application.currentAssessment.assessmentWorkflowId) {
await this.cancelAssessmentQueue.add({
assessmentId: application.currentAssessment.id,
});
}

return { application, createdAssessment: originalAssessment };
}
Expand Down Expand Up @@ -999,18 +997,20 @@ export class ApplicationService extends RecordDataModelService<Application> {
}
// Updates the application status to cancelled.
const now = new Date();
const auditUser = { id: auditUserId } as User;
application.applicationStatus = ApplicationStatus.Cancelled;
application.applicationStatusUpdatedOn = now;
application.modifier = { id: auditUserId } as User;
application.modifier = auditUser;
application.updatedAt = now;

// Updates the current assessment status to cancellation requested.
application.currentAssessment.studentAssessmentStatus =
StudentAssessmentStatus.CancellationRequested;
application.currentAssessment.modifier = auditUser;
application.currentAssessment.studentAssessmentStatusUpdatedOn = now;
application.currentAssessment.updatedAt = now;

await this.repo.save(application);
// Delete workflow and rollback overawards if the workflow started.
// Workflow doest not exists for draft or submitted application, for instance.
if (application.currentAssessment?.assessmentWorkflowId) {
await this.cancelAssessmentQueue.add({
assessmentId: application.currentAssessment.id,
});
}
return application;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
DatabaseConstraintNames,
PostgresDriverError,
mapFromRawAndEntities,
StudentAssessmentStatus,
} from "@sims/sims-db";
import {
DataSource,
Expand All @@ -45,7 +46,6 @@ import {
dateDifference,
decimalRound,
FieldSortOrder,
QueueNames,
} from "@sims/utilities";
import {
OFFERING_INVALID_OPERATION_IN_THE_CURRENT_STATE,
Expand All @@ -64,20 +64,11 @@ import {
import { EducationProgramOfferingValidationService } from "./education-program-offering-validation.service";
import * as os from "os";
import { LoggerService, InjectLogger } from "@sims/utilities/logger";
import { WorkflowClientService } from "@sims/services";
import { CreateProcessInstanceResponse } from "zeebe-node";
import { Job, Queue } from "bull";
import { CancelAssessmentQueueInDTO } from "@sims/services/queue";
import { InjectQueue } from "@nestjs/bull";

@Injectable()
export class EducationProgramOfferingService extends RecordDataModelService<EducationProgramOffering> {
constructor(
private readonly dataSource: DataSource,
private readonly workflowClientService: WorkflowClientService,
private readonly offeringValidationService: EducationProgramOfferingValidationService,
@InjectQueue(QueueNames.CancelApplicationAssessment)
private readonly cancelAssessmentQueue: Queue<CancelAssessmentQueueInDTO>,
) {
super(dataSource.getRepository(EducationProgramOffering));
}
Expand Down Expand Up @@ -1106,6 +1097,14 @@ export class EducationProgramOfferingService extends RecordDataModelService<Educ
// then set the application as cancelled as it cannot be re-assessed.
else {
application.applicationStatus = ApplicationStatus.Cancelled;

// Updates the current assessment status to cancellation required.
application.currentAssessment.studentAssessmentStatus =
StudentAssessmentStatus.CancellationRequested;
application.currentAssessment.modifier = auditUser;
application.currentAssessment.studentAssessmentStatusUpdatedOn =
currentDate;
application.currentAssessment.updatedAt = currentDate;
}

application.modifier = auditUser;
Expand Down Expand Up @@ -1152,46 +1151,6 @@ export class EducationProgramOfferingService extends RecordDataModelService<Educ
.save(applications);
}
});

// Once the impacted applications are updated with new current assessment
// start the assessment workflows and delete the existing workflow instances.
if (applications?.length > 0) {
await this.startOfferingChangeAssessments(applications);
}
}

/**
* For an offering change start the assessment workflows for all new assessments
* and delete the existing workflow instances of previous assessments.
* @param applications applications impacted by offering change.
*/
private async startOfferingChangeAssessments(
applications: ApplicationAssessmentSummary[],
): Promise<void> {
const promises: Promise<Job | CreateProcessInstanceResponse>[] = [];
// Used to limit the number of asynchronous operations
// that will start at the same time based on length of cpus.
// TODO: Currently the parallel processing is limited logical CPU core count but this approach
// TODO: needs to be revisited.
const maxPromisesAllowed = os.cpus().length;
for (const application of applications) {
// When the assessment data is populated, the workflow is complete.
// Only running workflow instances can be deleted.
if (application.assessmentWorkflowId && !application.hasAssessmentData) {
const deleteAssessmentPromise = this.cancelAssessmentQueue.add({
assessmentId: application.assessmentId,
});
promises.push(deleteAssessmentPromise);
}
if (promises.length >= maxPromisesAllowed) {
// Waits for promises to be process when it reaches maximum allowable parallel
// count.
await Promise.all(promises);
promises.splice(0, promises.length);
}
}
// Processing any pending promise if not completed.
await Promise.all(promises);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ describe(
`Cancelling application assessment id ${studentAssessment.id}`,
`Found workflow id ${workflowInstanceId}.`,
"Workflow instance successfully cancelled.",
"Changing student assessment status to Cancelled.",
"Rolling back overawards, if any.",
"Assessment cancelled with success.",
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import {
QueueProcessSummary,
QueueProcessSummaryResult,
} from "../models/processors.models";
import { ApplicationStatus } from "@sims/sims-db";
import {
ApplicationStatus,
StudentAssessment,
StudentAssessmentStatus,
} from "@sims/sims-db";

/**
* Process the workflow cancellation.
Expand Down Expand Up @@ -100,6 +104,14 @@ export class CancelApplicationAssessmentProcessor {
);
}
return this.dataSource.transaction(async (transactionEntityManager) => {
await summary.info(
`Changing student assessment status to ${StudentAssessmentStatus.Cancelled}.`,
);
await transactionEntityManager
.getRepository(StudentAssessment)
.update(assessment.id, {
studentAssessmentStatus: StudentAssessmentStatus.Cancelled,
});
// Overawards rollback.
// This method is safe to be called independently of the workflow state but it makes sense only after the
// application moves from the 'In progress' status when the disbursements are generated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ import {
import { Not } from "typeorm";
import {
Application,
ApplicationStatus,
AssessmentTriggerType,
StudentAssessmentStatus,
User,
} from "@sims/sims-db";
import { StartAssessmentQueueInDTO } from "@sims/services/queue";
import {
CancelAssessmentQueueInDTO,
StartAssessmentQueueInDTO,
} from "@sims/services/queue";

describe(
describeProcessorRootTest(QueueNames.AssessmentWorkflowEnqueuer),
Expand All @@ -31,6 +35,7 @@ describe(
let db: E2EDataSources;
let processor: AssessmentWorkflowEnqueuerScheduler;
let startApplicationAssessmentQueueMock: Queue<StartAssessmentQueueInDTO>;
let cancelApplicationAssessmentQueueMock: Queue<CancelAssessmentQueueInDTO>;
let auditUser: User;

beforeAll(async () => {
Expand All @@ -42,6 +47,10 @@ describe(
startApplicationAssessmentQueueMock = app.get(
getQueueProviderName(QueueNames.StartApplicationAssessment),
);
// Mocked CancelApplicationAssessment queue.
cancelApplicationAssessmentQueueMock = app.get(
getQueueProviderName(QueueNames.CancelApplicationAssessment),
);
// Processor under test.
processor = app.get(AssessmentWorkflowEnqueuerScheduler);
});
Expand Down Expand Up @@ -186,6 +195,73 @@ describe(
expect(startApplicationAssessmentQueueMock.add).not.toBeCalled();
});

it(`Should queue the assessment cancellation when an application has one assessment with status '${StudentAssessmentStatus.CancellationRequested}'.`, async () => {
// Arrange

// Application with a cancellation requested assessment.
const application = await createDefaultApplication(
StudentAssessmentStatus.CancellationRequested,
);
application.applicationStatus = ApplicationStatus.Overwritten;
await db.application.save(application);

// Queued job.
const job = createMock<Job<void>>();

// Act
await processor.enqueueAssessmentOperations(job);

// Assert
// Load application and related assessments for assertion.
const updatedApplication = await loadApplicationDataForAssertions(
application.id,
);

// Assert item was added to the queue.
const queueData = {
assessmentId: updatedApplication.currentAssessment.id,
} as CancelAssessmentQueueInDTO;
expect(cancelApplicationAssessmentQueueMock.add).toBeCalledWith(
queueData,
);

expect(updatedApplication.currentAssessment.studentAssessmentStatus).toBe(
StudentAssessmentStatus.CancellationQueued,
);
});

it(`Should not queue the assessment cancellation when an application has one assessment with status '${StudentAssessmentStatus.CancellationRequested}' but has another assessment with status '${StudentAssessmentStatus.CancellationQueued}'.`, async () => {
// Arrange

// Application with a cancellation queued assessment.
const application = await createDefaultApplication(
StudentAssessmentStatus.CancellationQueued,
);
const submittedAssessment = createFakeStudentAssessment({
application,
auditUser,
});
submittedAssessment.studentAssessmentStatus =
StudentAssessmentStatus.CancellationRequested;
await db.studentAssessment.save(submittedAssessment);

application.applicationStatus = ApplicationStatus.Overwritten;
application.studentAssessments = [
application.currentAssessment,
submittedAssessment,
];
await db.application.save(application);
andrewsignori-aot marked this conversation as resolved.
Show resolved Hide resolved

// Queued job.
const job = createMock<Job<void>>();

// Act
await processor.enqueueAssessmentOperations(job);

// Assert
expect(cancelApplicationAssessmentQueueMock.add).not.toBeCalled();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

});

/**
* Get a default application with only one original assessment to test
* multiple positive and negative scenarios ensuring that the only variant
Expand Down Expand Up @@ -220,6 +296,7 @@ describe(
id: true,
currentAssessment: {
id: true,
studentAssessmentStatus: true,
},
currentProcessingAssessment: {
id: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@ export class AssessmentWorkflowEnqueuerScheduler extends BaseScheduler<void> {
processSummary.info(
"Checking application assessments to be queued for start.",
);
// Process summary to be populated by the enqueueStartAssessmentWorkflows.
// In case an unexpected error happen the finally block will still be able to
// output the partial information captured by the processSummary.
const serviceProcessSummary = new ProcessSummary();
processSummary.children(serviceProcessSummary);
await this.workflowEnqueuerService.enqueueStartAssessmentWorkflows(
serviceProcessSummary,
// Check for applications with assessments to be cancelled.
await this.executeEnqueueProcess(
processSummary,
this.workflowEnqueuerService.enqueueCancelAssessmentWorkflows.bind(
this.workflowEnqueuerService,
),
);
// Check for applications with assessments to be started.
await this.executeEnqueueProcess(
processSummary,
this.workflowEnqueuerService.enqueueStartAssessmentWorkflows.bind(
this.workflowEnqueuerService,
),
);
return getSuccessMessageWithAttentionCheck(
"Process finalized with success.",
Expand All @@ -64,6 +70,29 @@ export class AssessmentWorkflowEnqueuerScheduler extends BaseScheduler<void> {
}
}

/**
* Enqueues the process and creates a new process summary for the queue process.
* @param parentProcessSummary parent process summary.
* @param enqueueProcess enqueue process function to be called.
*/
private async executeEnqueueProcess(
andrewsignori-aot marked this conversation as resolved.
Show resolved Hide resolved
parentProcessSummary: ProcessSummary,
enqueueProcess: (summary: ProcessSummary) => Promise<void>,
): Promise<void> {
try {
// Process summary to be populated by each enqueueing workflow call.
// In case an unexpected error happen the finally block will still be able to
// output the partial information captured by the processSummary.
const serviceProcessSummary = new ProcessSummary();
parentProcessSummary.children(serviceProcessSummary);
await enqueueProcess(serviceProcessSummary);
} catch (error: unknown) {
const errorMessage =
"Unexpected error while enqueueing start assessment workflows.";
parentProcessSummary.error(errorMessage, error);
}
}

@InjectLogger()
logger: LoggerService;
}
Loading