@@ -19,6 +19,10 @@ var uuid = require('uuid');
19
19
20
20
var commands = require ( './commands/' ) ;
21
21
22
+ Promise . config ( {
23
+ cancellation : true
24
+ } ) ;
25
+
22
26
/**
23
27
Gets or creates a new Queue with the given name.
24
28
@@ -140,7 +144,7 @@ var Queue = function Queue(name, url, opts){
140
144
this . eclient = createClient ( 'subscriber' , redisOpts ) ;
141
145
142
146
this . handlers = { } ;
143
- this . delayTimer = null ;
147
+ this . delayTimer = Promise . resolve ( ) ;
144
148
this . processing = [ ] ;
145
149
this . retrieving = 0 ;
146
150
@@ -246,33 +250,16 @@ Queue.prototype.off = Queue.prototype.removeListener;
246
250
247
251
Queue . prototype . _init = function ( name ) {
248
252
var _this = this ;
249
- var initializers = [ this . client , this . eclient ] . map ( function ( client ) {
250
- var _resolve , errorHandler ;
251
- return new Promise ( function ( resolve , reject ) {
252
- _resolve = resolve ;
253
- errorHandler = function ( err ) {
254
- if ( err . code !== 'ECONNREFUSED' ) {
255
- reject ( err ) ;
256
- }
257
- } ;
258
- client . once ( 'ready' , resolve ) ;
259
- client . on ( 'error' , errorHandler ) ;
260
- } ) . finally ( function ( ) {
261
- client . removeListener ( 'ready' , _resolve ) ;
262
- client . removeListener ( 'error' , errorHandler ) ;
263
- } ) ;
264
- } ) ;
265
253
266
- this . _initializing = Promise . all ( initializers ) . then ( function ( ) {
267
- return _this . eclient . psubscribe ( _this . toKey ( '' ) + '*' ) ;
268
- } ) . then ( function ( ) {
269
- return commands ( _this . client ) ;
270
- } ) . then ( function ( ) {
271
- debuglog ( name + ' queue ready' ) ;
272
- } , function ( err ) {
273
- _this . emit ( 'error' , err , 'Error initializing queue' ) ;
274
- throw err ;
275
- } ) ;
254
+ this . _initializing = _this . eclient . psubscribe ( _this . toKey ( '' ) + '*' )
255
+ . then ( function ( ) {
256
+ return commands ( _this . client ) ;
257
+ } ) . then ( function ( ) {
258
+ debuglog ( name + ' queue ready' ) ;
259
+ } , function ( err ) {
260
+ _this . emit ( 'error' , err , 'Error initializing queue' ) ;
261
+ throw err ;
262
+ } ) ;
276
263
} ;
277
264
278
265
Queue . prototype . _setupQueueEventListeners = function ( ) {
@@ -375,7 +362,7 @@ Queue.prototype.close = function( doNotWaitJobs ){
375
362
_ . each ( _this . errorRetryTimer , function ( timer ) {
376
363
clearTimeout ( timer ) ;
377
364
} ) ;
378
- clearTimeout ( _this . delayTimer ) ;
365
+ _this . delayTimer . cancel ( ) ;
379
366
clearInterval ( _this . guardianTimer ) ;
380
367
clearInterval ( _this . moveUnlockedJobsToWaitInterval ) ;
381
368
_this . timers . clearAll ( ) ;
@@ -423,6 +410,58 @@ Queue.prototype.process = function(name, concurrency, handler){
423
410
} ) ;
424
411
} ;
425
412
413
+ //
414
+ // This code will be called everytime a job is going to be processed if the job has a repeat option. (from delay -> active).
415
+ //
416
+ var parser = require ( 'cron-parser' ) ;
417
+
418
+ function nextRepeatableJob ( queue , name , data , opts ) {
419
+ var repeat = opts . repeat ;
420
+ var repeatKey = queue . toKey ( 'repeat' ) + ':' + name + ':' + repeat . cron ;
421
+
422
+ //
423
+ // Get millis for this repeatable job.
424
+ //
425
+ return queue . client . get ( repeatKey ) . then ( function ( millis ) {
426
+ if ( millis ) {
427
+ return parseInt ( millis ) ;
428
+ } else {
429
+ return Date . now ( ) ;
430
+ }
431
+ } ) . then ( function ( millis ) {
432
+ var interval = parser . parseExpression ( repeat . cron , _ . defaults ( {
433
+ currentDate : new Date ( millis )
434
+ } , repeat ) ) ;
435
+ var nextMillis ;
436
+ try {
437
+ nextMillis = interval . next ( ) ;
438
+ } catch ( e ) {
439
+ // Ignore error
440
+ }
441
+
442
+ if ( nextMillis ) {
443
+ nextMillis = nextMillis . getTime ( ) ;
444
+ var delay = nextMillis - millis ;
445
+
446
+ //
447
+ // Generate unique job id for this iteration.
448
+ //
449
+ var customId = 'repeat:' + name + ':' + nextMillis ;
450
+
451
+ //
452
+ // Set key and add job should be atomic.
453
+ //
454
+ return queue . client . set ( repeatKey , nextMillis ) . then ( function ( ) {
455
+ return Job . create ( queue , name , data , _ . extend ( _ . clone ( opts ) , {
456
+ jobId : customId ,
457
+ delay : delay < 0 ? 0 : delay ,
458
+ timestamp : Date . now ( )
459
+ } ) ) ;
460
+ } ) ;
461
+ }
462
+ } ) ;
463
+ } ;
464
+
426
465
Queue . prototype . start = function ( concurrency ) {
427
466
var _this = this ;
428
467
return this . run ( concurrency ) . catch ( function ( err ) {
@@ -449,6 +488,11 @@ Queue.prototype.setHandler = function(name, handler){
449
488
interface JobOptions
450
489
{
451
490
attempts: number;
491
+
492
+ repeat: {
493
+ tz?: string,
494
+ endDate?: Date | string | number
495
+ }
452
496
}
453
497
*/
454
498
@@ -459,7 +503,11 @@ interface JobOptions
459
503
@param opts: JobOptions Options for this job.
460
504
*/
461
505
Queue . prototype . add = function ( name , data , opts ) {
462
- return Job . create ( this , name , data , opts ) ;
506
+ if ( opts && opts . repeat ) {
507
+ return nextRepeatableJob ( this , name || DEFAULT_JOB_NAME , data , opts ) ;
508
+ } else {
509
+ return Job . create ( this , name , data , opts ) ;
510
+ }
463
511
} ;
464
512
465
513
/**
@@ -588,14 +636,15 @@ Queue.prototype.run = function(concurrency){
588
636
*/
589
637
Queue . prototype . updateDelayTimer = function ( newDelayedTimestamp ) {
590
638
var _this = this ;
639
+ newDelayedTimestamp = Math . round ( newDelayedTimestamp ) ;
591
640
if ( newDelayedTimestamp < _this . delayedTimestamp && newDelayedTimestamp < ( MAX_TIMEOUT_MS + Date . now ( ) ) ) {
592
- clearTimeout ( this . delayTimer ) ;
641
+ this . delayTimer . cancel ( ) ;
593
642
this . delayedTimestamp = newDelayedTimestamp ;
594
643
595
644
var nextDelayedJob = newDelayedTimestamp - Date . now ( ) ;
596
- nextDelayedJob = nextDelayedJob < 0 ? 0 : nextDelayedJob ;
645
+ var delay = nextDelayedJob <= 0 ? Promise . resolve ( ) : Promise . delay ( nextDelayedJob ) ;
597
646
598
- this . delayTimer = setTimeout ( function ( ) {
647
+ this . delayTimer = delay . then ( function ( ) {
599
648
scripts . updateDelaySet ( _this , _this . delayedTimestamp ) . then ( function ( nextTimestamp ) {
600
649
if ( nextTimestamp ) {
601
650
nextTimestamp = nextTimestamp < Date . now ( ) ? Date . now ( ) : nextTimestamp ;
@@ -607,7 +656,7 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
607
656
_this . emit ( 'error' , err , 'Error updating the delay timer' ) ;
608
657
} ) ;
609
658
_this . delayedTimestamp = Number . MAX_VALUE ;
610
- } , nextDelayedJob ) ;
659
+ } ) ;
611
660
}
612
661
} ;
613
662
@@ -791,7 +840,13 @@ Queue.prototype.getNextJob = function() {
791
840
792
841
return scripts . moveToActive ( this ) . spread ( function ( jobData , jobId ) {
793
842
if ( jobData ) {
794
- return Job . fromData ( _this , jobData , jobId ) ;
843
+ var job = Job . fromData ( _this , jobData , jobId ) ;
844
+ if ( job . opts . repeat ) {
845
+ return nextRepeatableJob ( _this , job . name , job . data , job . opts ) . then ( function ( ) {
846
+ return job ;
847
+ } ) ;
848
+ }
849
+ return job ;
795
850
} else {
796
851
return newJobs ;
797
852
}
0 commit comments