Skip to content

Commit

Permalink
Add lifecycle events
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Grondin committed Jun 1, 2019
1 parent 558bce3 commit f819836
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 35 deletions.
28 changes: 18 additions & 10 deletions es5.js
Original file line number Diff line number Diff line change
Expand Up @@ -1557,30 +1557,41 @@
}, {
key: "doReceive",
value: function doReceive() {
var eventInfo;

this._states.start(this.options.id);

return this.Events.trigger("debug", "Queueing ".concat(this.options.id), {
eventInfo = {
args: this.args,
options: this.options
};
return this.Events.trigger("received", "Received ".concat(this.options.id), {
args: this.args,
options: this.options
});
}
}, {
key: "doQueue",
value: function doQueue(reachedHWM, blocked) {
var eventInfo;

this._assertStatus("RECEIVED");

this._states.next(this.options.id);

return this.Events.trigger("debug", "Queued ".concat(this.options.id), {
eventInfo = {
args: this.args,
options: this.options,
reachedHWM: reachedHWM,
blocked: blocked
});
};
return this.Events.trigger("queued", "Queued ".concat(this.options.id), eventInfo);
}
}, {
key: "doRun",
value: function doRun() {
var eventInfo;

if (this.retryCount === 0) {
this._assertStatus("QUEUED");

Expand All @@ -1589,10 +1600,11 @@
this._assertStatus("EXECUTING");
}

return this.Events.trigger("debug", "Scheduling ".concat(this.options.id), {
eventInfo = {
args: this.args,
options: this.options
});
};
return this.Events.trigger("scheduled", "Scheduled ".concat(this.options.id), eventInfo);
}
}, {
key: "doExecute",
Expand All @@ -1613,15 +1625,12 @@
this._assertStatus("EXECUTING");
}

this.Events.trigger("debug", "Executing ".concat(this.options.id), {
args: this.args,
options: this.options
});
eventInfo = {
args: this.args,
options: this.options,
retryCount: this.retryCount
};
this.Events.trigger("executing", "Executing ".concat(this.options.id), eventInfo);
_context.prev = 3;
_context.next = 6;
return chained != null ? chained.schedule.apply(chained, [this.options, this.task].concat(_toConsumableArray(this.args))) : this.task.apply(this, _toConsumableArray(this.args));
Expand Down Expand Up @@ -1745,7 +1754,6 @@

this._states.next(this.options.id);

this.Events.trigger("debug", "Completed ".concat(this.options.id), eventInfo);
return this.Events.trigger("done", "Completed ".concat(this.options.id), eventInfo);
}
}]);
Expand Down
30 changes: 19 additions & 11 deletions lib/Job.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,39 @@ Job = class Job {
}

doReceive() {
var eventInfo;

this._states.start(this.options.id);

return this.Events.trigger("debug", `Queueing ${this.options.id}`, {
eventInfo = {
args: this.args,
options: this.options
};
return this.Events.trigger("received", `Received ${this.options.id}`, {
args: this.args,
options: this.options
});
}

doQueue(reachedHWM, blocked) {
var eventInfo;

this._assertStatus("RECEIVED");

this._states.next(this.options.id);

return this.Events.trigger("debug", `Queued ${this.options.id}`, {
eventInfo = {
args: this.args,
options: this.options,
reachedHWM,
blocked
});
};
return this.Events.trigger("queued", `Queued ${this.options.id}`, eventInfo);
}

doRun() {
var eventInfo;

if (this.retryCount === 0) {
this._assertStatus("QUEUED");

Expand All @@ -109,10 +120,11 @@ Job = class Job {
this._assertStatus("EXECUTING");
}

return this.Events.trigger("debug", `Scheduling ${this.options.id}`, {
eventInfo = {
args: this.args,
options: this.options
});
};
return this.Events.trigger("scheduled", `Scheduled ${this.options.id}`, eventInfo);
}

doExecute(chained, clearGlobalState, run, free) {
Expand All @@ -129,17 +141,14 @@ Job = class Job {
_this._assertStatus("EXECUTING");
}

_this.Events.trigger("debug", `Executing ${_this.options.id}`, {
args: _this.args,
options: _this.options
});

eventInfo = {
args: _this.args,
options: _this.options,
retryCount: _this.retryCount
};

_this.Events.trigger("executing", `Executing ${_this.options.id}`, eventInfo);

try {
passed = yield chained != null ? chained.schedule(_this.options, _this.task, ..._this.args) : _this.task(..._this.args);

Expand Down Expand Up @@ -207,7 +216,6 @@ Job = class Job {

this._states.next(this.options.id);

this.Events.trigger("debug", `Completed ${this.options.id}`, eventInfo);
return this.Events.trigger("done", `Completed ${this.options.id}`, eventInfo);
}

Expand Down
15 changes: 10 additions & 5 deletions light.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,24 +371,30 @@
}

doReceive() {
var eventInfo;
this._states.start(this.options.id);
return this.Events.trigger("debug", `Queueing ${this.options.id}`, {args: this.args, options: this.options});
eventInfo = {args: this.args, options: this.options};
return this.Events.trigger("received", `Received ${this.options.id}`, {args: this.args, options: this.options});
}

doQueue(reachedHWM, blocked) {
var eventInfo;
this._assertStatus("RECEIVED");
this._states.next(this.options.id);
return this.Events.trigger("debug", `Queued ${this.options.id}`, {args: this.args, options: this.options, reachedHWM, blocked});
eventInfo = {args: this.args, options: this.options, reachedHWM, blocked};
return this.Events.trigger("queued", `Queued ${this.options.id}`, eventInfo);
}

doRun() {
var eventInfo;
if (this.retryCount === 0) {
this._assertStatus("QUEUED");
this._states.next(this.options.id);
} else {
this._assertStatus("EXECUTING");
}
return this.Events.trigger("debug", `Scheduling ${this.options.id}`, {args: this.args, options: this.options});
eventInfo = {args: this.args, options: this.options};
return this.Events.trigger("scheduled", `Scheduled ${this.options.id}`, eventInfo);
}

async doExecute(chained, clearGlobalState, run, free) {
Expand All @@ -399,8 +405,8 @@
} else {
this._assertStatus("EXECUTING");
}
this.Events.trigger("debug", `Executing ${this.options.id}`, {args: this.args, options: this.options});
eventInfo = {args: this.args, options: this.options, retryCount: this.retryCount};
this.Events.trigger("executing", `Executing ${this.options.id}`, eventInfo);
try {
passed = (await (chained != null ? chained.schedule(this.options, this.task, ...this.args) : this.task(...this.args)));
if (clearGlobalState()) {
Expand Down Expand Up @@ -444,7 +450,6 @@
doDone(eventInfo) {
this._assertStatus("EXECUTING");
this._states.next(this.options.id);
this.Events.trigger("debug", `Completed ${this.options.id}`, eventInfo);
return this.Events.trigger("done", `Completed ${this.options.id}`, eventInfo);
}

Expand Down
12 changes: 7 additions & 5 deletions src/Job.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,30 @@ class Job

doReceive: () ->
@_states.start @options.id
@Events.trigger "debug", "Queueing #{@options.id}", { @args, @options }
eventInfo = { @args, @options }
@Events.trigger "received", "Received #{@options.id}", { @args, @options }

doQueue: (reachedHWM, blocked) ->
@_assertStatus "RECEIVED"
@_states.next @options.id
@Events.trigger "debug", "Queued #{@options.id}", { @args, @options, reachedHWM, blocked }
eventInfo = { @args, @options, reachedHWM, blocked }
@Events.trigger "queued", "Queued #{@options.id}", eventInfo

doRun: () ->
if @retryCount == 0
@_assertStatus "QUEUED"
@_states.next @options.id
else @_assertStatus "EXECUTING"
@Events.trigger "debug", "Scheduling #{@options.id}", { @args, @options }
eventInfo = { @args, @options }
@Events.trigger "scheduled", "Scheduled #{@options.id}", eventInfo

doExecute: (chained, clearGlobalState, run, free) ->
if @retryCount == 0
@_assertStatus "RUNNING"
@_states.next @options.id
else @_assertStatus "EXECUTING"
@Events.trigger "debug", "Executing #{@options.id}", { @args, @options }
eventInfo = { @args, @options, @retryCount }
@Events.trigger "executing", "Executing #{@options.id}", eventInfo

try
passed = await if chained?
Expand Down Expand Up @@ -91,7 +94,6 @@ class Job
doDone: (eventInfo) ->
@_assertStatus "EXECUTING"
@_states.next @options.id
@Events.trigger "debug", "Completed #{@options.id}", eventInfo
@Events.trigger "done", "Completed #{@options.id}", eventInfo

module.exports = Job
69 changes: 65 additions & 4 deletions test/general.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('General', function () {
process.env.DATASTORE !== 'redis' && process.env.DATASTORE !== 'ioredis' &&
process.env.BUILD !== 'es5' && process.env.BUILD !== 'light'
) {
it.only('Should not leak memory on instantiation', async function () {
it('Should not leak memory on instantiation', async function () {
c = makeTest()
this.timeout(8000)
const { iterate } = require('leakage')
Expand All @@ -27,9 +27,9 @@ describe('General', function () {

})

it.only('Should not leak memory running jobs', async function () {
it('Should not leak memory running jobs', async function () {
c = makeTest()
this.timeout(81000)
this.timeout(12000)
const { iterate } = require('leakage')
const limiter = new Bottleneck({ datastore: 'local', maxConcurrent: 1, minTime: 10 })
await limiter.ready()
Expand All @@ -44,7 +44,7 @@ describe('General', function () {
i = i + zero + one
}, 0, 1)
}, { iterations: 25 })
c.mustEqual(i, 151)
c.mustEqual(i, 302)
})
}

Expand Down Expand Up @@ -314,6 +314,67 @@ describe('General', function () {
c.checkResultsOrder([[1], [2], [3]])
})
})

it('Should trigger events on status changes', function () {
c = makeTest({maxConcurrent: 2, minTime: 100, trackDoneStatus: true})
var onReceived = 0
var onQueued = 0
var onScheduled = 0
var onExecuting = 0
var onDone = 0
c.limiter.on('received', (message, info) => {
c.mustEqual(Object.keys(info).sort(), ['args', 'options'])
onReceived++
})
c.limiter.on('queued', (message, info) => {
c.mustEqual(Object.keys(info).sort(), ['args', 'blocked', 'options', 'reachedHWM'])
onQueued++
})
c.limiter.on('scheduled', (message, info) => {
c.mustEqual(Object.keys(info).sort(), ['args', 'options'])
onScheduled++
})
c.limiter.on('executing', (message, info) => {
c.mustEqual(Object.keys(info).sort(), ['args', 'options', 'retryCount'])
onExecuting++
})
c.limiter.on('done', (message, info) => {
c.mustEqual(Object.keys(info).sort(), ['args', 'options', 'retryCount'])
onDone++
})

c.mustEqual(c.limiter.counts(), { RECEIVED: 0, QUEUED: 0, RUNNING: 0, EXECUTING: 0, DONE: 0 })

c.pNoErrVal(c.limiter.schedule({ weight: 1, id: 1 }, c.slowPromise, 100, null, 1), 1)
c.pNoErrVal(c.limiter.schedule({ weight: 1, id: 2 }, c.slowPromise, 200, null, 2), 2)
c.pNoErrVal(c.limiter.schedule({ weight: 2, id: 3 }, c.slowPromise, 100, null, 3), 3)
c.mustEqual(c.limiter.counts(), { RECEIVED: 3, QUEUED: 0, RUNNING: 0, EXECUTING: 0, DONE: 0 })

c.mustEqual([onReceived, onQueued, onScheduled, onExecuting, onDone], [3, 0, 0, 0, 0])

return c.wait(50)
.then(function () {
c.mustEqual(c.limiter.counts(), { RECEIVED: 0, QUEUED: 1, RUNNING: 1, EXECUTING: 1, DONE: 0 })
c.mustEqual([onReceived, onQueued, onScheduled, onExecuting, onDone], [3, 3, 2, 1, 0])

return c.wait(100)
})
.then(function () {
c.mustEqual(c.limiter.counts(), { RECEIVED: 0, QUEUED: 1, RUNNING: 0, EXECUTING: 1, DONE: 1 })
c.mustEqual(c.limiter.jobs('DONE'), ['1'])
c.mustEqual(c.limiter.jobs('EXECUTING'), ['2'])
c.mustEqual(c.limiter.jobs('QUEUED'), ['3'])
c.mustEqual([onReceived, onQueued, onScheduled, onExecuting, onDone], [3, 3, 2, 2, 1])

return c.last()
})
.then(function (results) {
c.mustEqual(c.limiter.counts(), { RECEIVED: 0, QUEUED: 0, RUNNING: 0, EXECUTING: 0, DONE: 4 })
c.mustEqual([onReceived, onQueued, onScheduled, onExecuting, onDone], [4, 4, 4, 4, 4])
c.checkDuration(400)
c.checkResultsOrder([[1], [2], [3]])
})
})
})

describe('Events', function () {
Expand Down

0 comments on commit f819836

Please sign in to comment.