Skip to content

Commit

Permalink
feat(repeat): deprecate immediately on job scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 11, 2024
1 parent 07012e6 commit ed047f7
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 47 deletions.
42 changes: 20 additions & 22 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ export class JobScheduler extends QueueBase {
super(name, opts, Connection);

this.repeatStrategy =
(opts.settings && opts.settings.repeatStrategy) || getNextMillis;
(opts.settings && opts.settings.repeatStrategy) || defaultRepeatStrategy;
}

async upsertJobScheduler<T = any, R = any, N extends string = string>(
jobSchedulerId: string,
repeatOpts: Omit<RepeatOptions, 'key'>,
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis' | 'offset'>,
jobName: N,
jobData: T,
opts: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>,
Expand Down Expand Up @@ -69,7 +69,6 @@ export class JobScheduler extends QueueBase {
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

// Check if we have a start date for the repeatable job
const { startDate } = repeatOpts;
Expand All @@ -78,12 +77,18 @@ export class JobScheduler extends QueueBase {
now = startMillis > now ? startMillis : now;
}

const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
let nextMillis;
if (every) {
nextMillis = prevMillis + every;

if (nextMillis < now) {
nextMillis = now;
}
} else if (pattern) {
now = prevMillis < now ? now : prevMillis;
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
}

const hasImmediately = Boolean(
(every || pattern) && repeatOpts.immediately,
);
const offset = hasImmediately && every ? now - nextMillis : undefined;
if (nextMillis) {
if (override) {
await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, {
Expand All @@ -100,16 +105,15 @@ export class JobScheduler extends QueueBase {
);
}

const { immediately, ...filteredRepeatOpts } = repeatOpts;
const { startDate, ...filteredRepeatOpts } = repeatOpts;

return this.createNextJob<T, R, N>(
jobName,
nextMillis,
jobSchedulerId,
{ ...opts, repeat: { offset, ...filteredRepeatOpts } },
{ ...opts, repeat: filteredRepeatOpts },
jobData,
iterationCount,
hasImmediately,
);
}
}
Expand All @@ -121,24 +125,22 @@ export class JobScheduler extends QueueBase {
opts: JobsOptions,
data: T,
currentCount: number,
hasImmediately: boolean,
) {
//
// Generate unique job id for this iteration.
//
const jobId = this.getSchedulerNextJobId({
jobSchedulerId: jobSchedulerId,
jobSchedulerId,
nextMillis,
});

const now = Date.now();
const delay =
nextMillis + (opts.repeat.offset ? opts.repeat.offset : 0) - now;
const delay = nextMillis - now;

const mergedOpts = {
...opts,
jobId,
delay: delay < 0 || hasImmediately ? 0 : delay,
delay: delay < 0 ? 0 : delay,
timestamp: now,
prevMillis: nextMillis,
repeatJobKey: jobSchedulerId,
Expand Down Expand Up @@ -230,15 +232,11 @@ export class JobScheduler extends QueueBase {
}
}

export const getNextMillis = (
export const defaultRepeatStrategy = (
millis: number,
opts: RepeatOptions,
): number | undefined => {
const { every, pattern } = opts;

if (every) {
return Math.floor(millis / every) * every + (opts.immediately ? 0 : every);
}
const { pattern } = opts;

const currentDate = new Date(millis);
const interval = parseExpression(pattern, {
Expand Down
1 change: 0 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ export class Job<
this.opts = Object.assign(
{
attempts: 0,
delay: 0,
},
restOpts,
);
Expand Down
2 changes: 1 addition & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ export class Scripts {

if (parentOpts.waitChildrenKey) {
result = await this.addParentJob(client, job, encodedOpts, args);
} else if (opts.delay) {
} else if (typeof opts.delay == 'number') {
result = await this.addDelayedJob(client, job, encodedOpts, args);
} else if (opts.priority) {
result = await this.addPrioritizedJob(client, job, encodedOpts, args);
Expand Down
21 changes: 21 additions & 0 deletions src/interfaces/repeat-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,42 @@ export interface RepeatOptions extends Omit<ParserOptions, 'iterator'> {
* Number of times the job should repeat at max.
*/
limit?: number;

/**
* Repeat after this amount of milliseconds
* (`pattern` setting cannot be used together with this setting.)
*/
every?: number;

/**
* Repeated job should start right now
* ( work only with every settings)
*
* @deprecated
*
*/
immediately?: boolean;

/**
* The start value for the repeat iteration count.
*/
count?: number;

/**
* Internal property to store the previous time the job was executed.
*/
prevMillis?: number;

/**
* Internal property to store the offset to apply to the next iteration.
*
* @deprecated
*/
offset?: number;

/**
* Internal property to store de job id
* @deprecated
*/
jobId?: string;
}
65 changes: 42 additions & 23 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,24 +712,27 @@ describe('Job Scheduler', function () {
},
{ connection, prefix },
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

let prev: Job;
let counter = 0;

const completing = new Promise<void>(resolve => {
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async job => {
if (prev && counter === 1) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(100);
} else if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000);
}
prev = job;
counter++;
if (counter === 5) {
resolve();
try {
if (prev && counter === 1) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(100);
} else if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000);
}
prev = job;
counter++;
if (counter === 5) {
resolve();
}
} catch (err) {
reject(err);
}
});
});
Expand All @@ -743,11 +746,11 @@ describe('Job Scheduler', function () {
{ data: { foo: 'bar' } },
);

this.clock.tick(100);
this.clock.tick(nextTick);

await completing;

await worker.close();
delayStub.restore();
});

it('should start immediately even after removing the job scheduler and adding it again', async function () {
Expand Down Expand Up @@ -782,7 +785,7 @@ describe('Job Scheduler', function () {
{ data: { foo: 'bar' } },
);

this.clock.tick(1265);
this.clock.tick(nextTick);

await processing1;

Expand Down Expand Up @@ -1611,31 +1614,43 @@ describe('Job Scheduler', function () {
this.clock.setSystemTime(initialDate);

// Set the next tick (repeat interval) and the startDate in the future
const nextTick = 2 * ONE_SECOND + 500;
const nextTick = ONE_DAY;
const startDate = new Date('2024-01-01 10:00:10'); // 10 seconds in the future

const expectedDates = [
new Date('2024-01-01 10:00:10'),
new Date('2024-01-02 10:00:10'),
new Date('2024-01-03 10:00:10'),
new Date('2024-01-04 10:00:10'),
new Date('2024-01-05 10:00:10'),
];

let jobIteration = 0;

const worker = new Worker(
queueName,
async () => {
async _job => {
this.clock.tick(nextTick);
},
{ autorun: false, connection, prefix },
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

// Schedule the job with the 'every' interval and a future startDate
await queue.upsertJobScheduler(
const job = await queue.upsertJobScheduler(
'test',
{
every: 2000, // every 2 seconds
every: ONE_DAY,
startDate,
},
{ data: { foo: 'bar' } },
);

expect(job).to.be.ok;
expect(job?.delay).to.be.eql(10000);

// Simulate the passage of time up to the startDate
const startDateDelay = startDate.getTime() - initialDate.getTime();
this.clock.tick(startDateDelay + nextTick);
this.clock.tick(startDateDelay);

let prev: Job;
let counter = 0;
Expand All @@ -1646,6 +1661,11 @@ describe('Job Scheduler', function () {
try {
if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);

expect(new Date(job.processedOn!)).to.be.eql(
expectedDates[++jobIteration],
);

expect(job.timestamp - prev.timestamp).to.be.gte(2000); // Ensure it's repeating every 2 seconds
}
prev = job;
Expand All @@ -1663,7 +1683,6 @@ describe('Job Scheduler', function () {

await completing;
await worker.close();
delayStub.restore();
});

it('should throw an error when using .pattern and .every simultaneously', async function () {
Expand Down

0 comments on commit ed047f7

Please sign in to comment.