diff --git a/app/apps/server/bridges/scheduler.js b/app/apps/server/bridges/scheduler.js index c353641a06336..f59f61f0640d7 100644 --- a/app/apps/server/bridges/scheduler.js +++ b/app/apps/server/bridges/scheduler.js @@ -1,8 +1,9 @@ import Agenda from 'agenda'; import { MongoInternals } from 'meteor/mongo'; +import { StartupType } from '@rocket.chat/apps-engine/definition/scheduler'; function _callProcessor(processor) { - return (job) => processor(job.attrs.data); + return (job) => processor(job?.attrs?.data || {}); } export class AppSchedulerBridge { @@ -10,14 +11,36 @@ export class AppSchedulerBridge { this.orch = orch; this.scheduler = new Agenda({ mongo: MongoInternals.defaultRemoteCollectionDriver().mongo.client.db(), - collection: 'rocketchat_agenda_jobs', + db: { collection: 'rocketchat_apps_scheduler' }, + // this ensures the same job doesn't get executed multiple times in a cluster + defaultConcurrency: 1, }); this.isConnected = false; } async registerProcessors(processors = [], appId) { + const runAfterRegister = []; this.orch.debugLog(`The App ${ appId } is registering job processors`, processors); - processors.forEach(({ id, processor }) => this.scheduler.define(id, _callProcessor(processor))); + processors.forEach(({ id, processor, startupSetting }) => { + this.scheduler.define(id, _callProcessor(processor)); + + if (startupSetting) { + switch (startupSetting.type) { + case StartupType.ONETIME: + runAfterRegister.push(this.scheduleOnceAfterRegister({ id, when: startupSetting.when, data: startupSetting.data }, appId)); + break; + case StartupType.RECURRING: + runAfterRegister.push(this.scheduleRecurring({ id, interval: startupSetting.interval, data: startupSetting.data }, appId)); + break; + default: + break; + } + } + }); + + if (runAfterRegister.length) { + await Promise.all(runAfterRegister); + } } async scheduleOnce(job, appId) { @@ -26,10 +49,17 @@ export class AppSchedulerBridge { await this.scheduler.schedule(job.when, job.id, job.data || {}); } + async scheduleOnceAfterRegister(job, appId) { + const scheduledJobs = await this.scheduler.jobs({ name: job.id, type: 'normal' }); + if (!scheduledJobs.length) { + await this.scheduleOnce(job, appId); + } + } + async scheduleRecurring(job, appId) { this.orch.debugLog(`The App ${ appId } is scheduling a recurring job`, job); await this.startScheduler(); - await this.scheduler.every(job.cron, job.id, job.data || {}); + await this.scheduler.every(job.interval, job.id, job.data || {}); } async cancelJob(jobId, appId) { @@ -45,7 +75,7 @@ export class AppSchedulerBridge { async cancelAllJobs(appId) { this.orch.debugLog(`Canceling all jobs of App ${ appId }`); await this.startScheduler(); - const matcher = new RegExp(`^_${ appId }`); + const matcher = new RegExp(`_${ appId }$`); try { await this.scheduler.cancel({ name: { $regex: matcher } }); } catch (e) { diff --git a/package-lock.json b/package-lock.json index bd7b1c07939ee..7c50d7b494ad9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5028,9 +5028,9 @@ } }, "@rocket.chat/apps-engine": { - "version": "1.19.0-beta.4014", - "resolved": "https://registry.npmjs.org/@rocket.chat/apps-engine/-/apps-engine-1.19.0-beta.4014.tgz", - "integrity": "sha512-7SnN00qakhHDbWkD2EIs825VNIJR67YV9qR0Z3SJSjQy5uJwIJiWEa86GJEO3Mf/jkclhTTIR4Ncpy2jn43HfQ==", + "version": "1.19.0-beta.4086", + "resolved": "https://registry.npmjs.org/@rocket.chat/apps-engine/-/apps-engine-1.19.0-beta.4086.tgz", + "integrity": "sha512-w+zl2yVFYwoD4ZwFXVClD3YX84rb97ntgEIwTfqmALFU87xZ8/ymhytGGtdioAtUrxb3iWKfQlGZ64Jj0BOFEA==", "requires": { "adm-zip": "^0.4.9", "cryptiles": "^4.1.3", @@ -5038,13 +5038,6 @@ "semver": "^5.5.0", "stack-trace": "0.0.10", "uuid": "^3.2.1" - }, - "dependencies": { - "adm-zip": { - "version": "0.4.16", - "resolved": "https://registry.npmjs.org/adm-zip/-/adm-zip-0.4.16.tgz", - "integrity": "sha512-TFi4HBKSGfIKsK5YCkKaaFG2m4PEDyViZmEwof3MTIgzimHLto6muaHVpbrljdIvIrFZzEq/p4nafOeLcYegrg==" - } } }, "@rocket.chat/css-in-js": { @@ -18314,7 +18307,7 @@ }, "minimist": { "version": "0.0.8", - "resolved": false, + "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.8.tgz", "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=", "dev": true, "optional": true @@ -18342,7 +18335,7 @@ }, "mkdirp": { "version": "0.5.1", - "resolved": false, + "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz", "integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=", "dev": true, "optional": true, @@ -18515,7 +18508,7 @@ "dependencies": { "minimist": { "version": "1.2.0", - "resolved": false, + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.0.tgz", "integrity": "sha1-o1AIsg9BOD7sH7kU9M1d95omQoQ=", "dev": true, "optional": true diff --git a/package.json b/package.json index 85f3d676b5624..3b1c5a9d775d1 100644 --- a/package.json +++ b/package.json @@ -136,7 +136,7 @@ "@nivo/heatmap": "^0.61.0", "@nivo/line": "^0.61.1", "@nivo/pie": "^0.61.1", - "@rocket.chat/apps-engine": "1.19.0-beta.4014", + "@rocket.chat/apps-engine": "1.19.0-beta.4086", "@rocket.chat/css-in-js": "^0.17.2", "@rocket.chat/emitter": "^0.17.2", "@rocket.chat/fuselage": "^0.17.2",