Skip to content
99 changes: 90 additions & 9 deletions app/apps/server/bridges/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ function _callProcessor(processor) {
return (job) => processor(job?.attrs?.data || {});
}

/**
* Provides the Apps Engine with task scheduling capabilities
* It uses {@link agenda:github.com/agenda/agenda} as backend
*/
export class AppSchedulerBridge {
constructor(orch) {
this.orch = orch;
Expand All @@ -18,6 +22,34 @@ export class AppSchedulerBridge {
this.isConnected = false;
}

/**
* Entity that will be run in a job
* @typedef {Object} Processor
* @property {string} id The processor's identifier
* @property {function} processor The function that will be run on a given schedule
* @property {IOnetimeStartup|IRecurrentStartup} [startupSetting] If provided, the processor will be configured with the setting as soon as it gets registered

* Processor setting for running once after being registered
* @typedef {Object} IOnetimeStartup
* @property {string} type=onetime
* @property {string} when When the processor will be executed
* @property {Object} [data] An optional object that is passed to the processor
*
* Processor setting for running recurringly after being registered
* @typedef {Object} IRecurrentStartup
* @property {string} type=recurring
* @property {string} interval When the processor will be re executed
* @property {Object} [data] An optional object that is passed to the processor
*/

/**
* Register processors that can be scheduled to run
*
* @param {Array.<Processor>} processors An array of processors
* @param {string} appId
*
* @returns Promise<void>
*/
async registerProcessors(processors = [], appId) {
const runAfterRegister = [];
this.orch.debugLog(`The App ${ appId } is registering job processors`, processors);
Expand All @@ -30,9 +62,10 @@ export class AppSchedulerBridge {
runAfterRegister.push(this.scheduleOnceAfterRegister({ id, when: startupSetting.when, data: startupSetting.data }, appId));
break;
case StartupType.RECURRING:
runAfterRegister.push(this.scheduleRecurring({ id, interval: startupSetting.interval, data: startupSetting.data }, appId));
runAfterRegister.push(this.scheduleRecurring({ id, interval: startupSetting.interval, skipImmediate: startupSetting.skipImmediate, data: startupSetting.data }, appId));
break;
default:
this.orch.getRocketChatLogger().error(`Invalid startup setting type (${ startupSetting.type }) for the processor ${ id }`);
break;
}
}
Expand All @@ -43,10 +76,25 @@ export class AppSchedulerBridge {
}
}

/**
* Schedules a registered processor to run _once_.
*
* @param {Object} job
* @param {string} job.id The processor's id
* @param {string} job.when When the processor will be executed
* @param {Object} [job.data] An optional object that is passed to the processor
* @param {string} appId
*
* @returns Promise<void>
*/
async scheduleOnce(job, appId) {
this.orch.debugLog(`The App ${ appId } is scheduling an onetime job`, job);
await this.startScheduler();
await this.scheduler.schedule(job.when, job.id, job.data || {});
try {
await this.startScheduler();
await this.scheduler.schedule(job.when, job.id, job.data || {});
} catch (e) {
this.orch.getRocketChatLogger().error(e);
}
}

async scheduleOnceAfterRegister(job, appId) {
Expand All @@ -56,30 +104,63 @@ export class AppSchedulerBridge {
}
}

async scheduleRecurring(job, appId) {
this.orch.debugLog(`The App ${ appId } is scheduling a recurring job`, job);
await this.startScheduler();
await this.scheduler.every(job.interval, job.id, job.data || {});
/**
* Schedules a registered processor to run recurrently according to a given interval
*
* @param {Object} job
* @param {string} job.id The processor's id
* @param {string} job.interval When the processor will be re executed
* @param {boolean} job.skipImmediate=false Whether to let the first iteration to execute as soon as the task is registered
* @param {Object} [job.data] An optional object that is passed to the processor
* @param {string} appId
*
* @returns Promise<void>
*/
async scheduleRecurring({ id, interval, skipImmediate = false, data }, appId) {
this.orch.debugLog(`The App ${ appId } is scheduling a recurring job`, id);
try {
await this.startScheduler();
const job = this.scheduler.create(id, data || {});
job.repeatEvery(interval, { skipImmediate });
await job.save();
} catch (e) {
this.orch.getRocketChatLogger().error(e);
}
}

/**
* Cancels a running job given its jobId
*
* @param {string} jobId
* @param {string} appId
*
* @returns Promise<void>
*/
async cancelJob(jobId, appId) {
this.orch.debugLog(`The App ${ appId } is canceling a job`, jobId);
await this.startScheduler();
try {
await this.scheduler.cancel({ name: jobId });
} catch (e) {
console.error(e);
this.orch.getRocketChatLogger().error(e);
}
}

/**
* Cancels all the running jobs from the app
*
* @param {string} appId
*
* @returns Promise<void>
*/
async cancelAllJobs(appId) {
this.orch.debugLog(`Canceling all jobs of App ${ appId }`);
await this.startScheduler();
const matcher = new RegExp(`_${ appId }$`);
try {
await this.scheduler.cancel({ name: { $regex: matcher } });
} catch (e) {
console.error(e);
this.orch.getRocketChatLogger().error(e);
}
}

Expand Down