Skip to content

Commit

Permalink
Refactor to optimize memory usage, step 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Grondin committed May 8, 2019
1 parent 3d76c11 commit 0e397c1
Show file tree
Hide file tree
Showing 10 changed files with 938 additions and 624 deletions.
627 changes: 374 additions & 253 deletions es5.js

Large diffs are not rendered by default.

187 changes: 53 additions & 134 deletions lib/Bottleneck.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,110 +159,52 @@ Bottleneck = function () {
return this._store.__check__(weight);
}

_handler(index, job, error, passed) {
_clearGlobalState(index) {
if (this._scheduled[index] != null) {
clearTimeout(this._scheduled[index].expiration);
delete this._scheduled[index];
return true;
} else {
return false;
}
}

_free(index, job, options, eventInfo) {
var _this = this;

return _asyncToGenerator(function* () {
var args, e, eventInfo, options, retry, retryAfter, retryCount, running;
var e, running;

try {
if (_this._scheduled[index] == null) {
return;
}

args = job.args;
options = job.options;
retryCount = job.retryCount;
clearTimeout(_this._scheduled[index].expiration);
delete _this._scheduled[index];
eventInfo = {
args,
options,
retryCount
};

if (error != null) {
retry = yield _this.Events.trigger("failed", error, eventInfo);

if (retry != null) {
retryAfter = ~~retry;

_this.Events.trigger("retry", `Retrying ${options.id} after ${retryAfter} ms`, eventInfo);

job.retryCount++;
return _this._run(job, retryAfter, index);
}
}

_this._states.next(options.id); // DONE


_this.Events.trigger("debug", `Completed ${options.id}`, eventInfo);

_this.Events.trigger("done", `Completed ${options.id}`, eventInfo);

var _ref = yield _this._store.__free__(index, options.weight);

running = _ref.running;

_this.Events.trigger("debug", `Freed ${options.id}`, eventInfo);

if (running === 0 && _this.empty()) {
_this.Events.trigger("idle");
return _this.Events.trigger("idle");
}

return job.done(error, passed);
} catch (error1) {
e = error1;
return _this.Events.trigger("error", e);
}
})();
}

_run(job, wait, index) {
var args, options, retryCount, task;
task = job.task;
args = job.args;
options = job.options;
retryCount = job.retryCount;
this.Events.trigger("debug", `Scheduling ${options.id}`, {
args,
options
});

if (retryCount === 0) {
// RUNNING
this._states.next(options.id);
}

_run(index, job, wait) {
var clearGlobalState, free, run;
job.doRun();
clearGlobalState = this._clearGlobalState.bind(this, index);
run = this._run.bind(this, index, job);
free = this._free.bind(this, index, job);
return this._scheduled[index] = {
timeout: setTimeout(() => {
var handler;
this.Events.trigger("debug", `Executing ${options.id}`, {
args,
options
});

if (retryCount === 0) {
// EXECUTING
this._states.next(options.id);
}

handler = this._handler.bind(this, index, job);

if (this._limiter != null) {
return this._limiter.schedule(options, task, ...args).then(function (passed) {
return handler(null, passed);
}).catch(function (error) {
return handler(error);
});
} else {
return job.execute(handler);
}
return job.doExecute(this._limiter, clearGlobalState, run, free);
}, wait),
expiration: options.expiration != null ? setTimeout(() => {
return this._handler(index, job, new Bottleneck.prototype.BottleneckError(`This job timed out after ${options.expiration} ms.`));
}, wait + options.expiration) : void 0,
expiration: job.options.expiration != null ? setTimeout(function () {
return job.doExpire(clearGlobalState, run, free);
}, wait + job.options.expiration) : void 0,
job: job
};
}
Expand Down Expand Up @@ -315,7 +257,7 @@ Bottleneck = function () {
this.Events.trigger("depleted", empty);
}

this._run(next, wait, index);
this._run(index, next, wait);

return this.Promise.resolve(options.weight);
} else {
Expand All @@ -340,19 +282,11 @@ Bottleneck = function () {
});
}

_drop(job, message = "This job has been dropped by Bottleneck") {
if (this._states.remove(job.options.id)) {
if (this.rejectOnDrop) {
job.reject(new Bottleneck.prototype.BottleneckError(message));
}

return this.Events.trigger("dropped", job);
}
}

_dropAllQueued(message) {
return this._queues.shiftAll(job => {
return this._drop(job, message);
return this._queues.shiftAll(function (job) {
return job.doDrop({
message
});
});
}

Expand Down Expand Up @@ -383,8 +317,10 @@ Bottleneck = function () {
});
};

done = options.dropWaitingJobs ? (this._run = next => {
return this._drop(next, options.dropErrorMessage);
done = options.dropWaitingJobs ? (this._run = function (index, next) {
return next.doDrop({
message: options.dropErrorMessage
});
}, this._drainOne = () => {
return this.Promise.resolve(null);
}, this._registerLock.schedule(() => {
Expand All @@ -398,8 +334,9 @@ Bottleneck = function () {
if (this.jobStatus(v.job.options.id) === "RUNNING") {
clearTimeout(v.timeout);
clearTimeout(v.expiration);

this._drop(v.job, options.dropErrorMessage);
v.job.doDrop({
message: options.dropErrorMessage
});
}
}

Expand All @@ -414,8 +351,8 @@ Bottleneck = function () {
return waitForExecuting(1);
});

this._addToQueue = job => {
return job.reject(new Bottleneck.prototype.BottleneckError(options.enqueueErrorMessage));
this._addToQueue = function (job) {
return job._reject(new Bottleneck.prototype.BottleneckError(options.enqueueErrorMessage));
};

this.stop = () => {
Expand All @@ -428,80 +365,62 @@ Bottleneck = function () {
_addToQueue(job) {
var _this2 = this;

var args, options, reject;
var args, options;
args = job.args;
options = job.options;
reject = job.reject;

if (this.jobStatus(options.id) != null) {
reject(new Bottleneck.prototype.BottleneckError(`A job with the same id already exists (id=${options.id})`));
if (!job.doReceive()) {
return false;
}

this._states.start(options.id); // RECEIVED


this.Events.trigger("debug", `Queueing ${options.id}`, {
args,
options
});
return this._submitLock.schedule(
/*#__PURE__*/
_asyncToGenerator(function* () {
var blocked, e, reachedHWM, shifted, strategy;
var blocked, error, reachedHWM, shifted, strategy;

try {
var _ref3 = yield _this2._store.__submit__(_this2.queued(), options.weight);

reachedHWM = _ref3.reachedHWM;
blocked = _ref3.blocked;
strategy = _ref3.strategy;

_this2.Events.trigger("debug", `Queued ${options.id}`, {
args,
options,
reachedHWM,
blocked
});
} catch (error1) {
e = error1;

_this2._states.remove(options.id);
error = error1;

_this2.Events.trigger("debug", `Could not queue ${options.id}`, {
args,
options,
error: e
error
});

reject(e);
job.doDrop({
error
});
return false;
}

if (blocked) {
_this2._drop(job);

job.doDrop();
return true;
} else if (reachedHWM) {
shifted = strategy === Bottleneck.prototype.strategy.LEAK ? _this2._queues.shiftLastFrom(options.priority) : strategy === Bottleneck.prototype.strategy.OVERFLOW_PRIORITY ? _this2._queues.shiftLastFrom(options.priority + 1) : strategy === Bottleneck.prototype.strategy.OVERFLOW ? job : void 0;

if (shifted != null) {
_this2._drop(shifted);
shifted.doDrop();
}

if (shifted == null || strategy === Bottleneck.prototype.strategy.OVERFLOW) {
if (shifted == null) {
_this2._drop(job);
job.doDrop();
}

return reachedHWM;
}
}

_this2._states.next(options.id); // QUEUED

job.doQueue(reachedHWM, blocked);

_this2._queues.push(options.priority, job);
_this2._queues.push(job);

yield _this2._drainAll();
return reachedHWM;
Expand All @@ -524,14 +443,14 @@ Bottleneck = function () {
}

task = (...args) => {
return new this.Promise((resolve, reject) => {
return new this.Promise(function (resolve, reject) {
return fn(...args, function (...args) {
return (args[0] != null ? reject : resolve)(args);
});
});
};

job = new Job(task, args, options, this.jobDefaults, this.Promise, NUM_PRIORITIES, DEFAULT_PRIORITY);
job = new Job(task, args, options, this.jobDefaults, this.rejectOnDrop, this.Events, this._states, this.Promise);
job.promise.then(function (args) {
return typeof cb === "function" ? cb(...args) : void 0;
}).catch(function (args) {
Expand Down Expand Up @@ -565,7 +484,7 @@ Bottleneck = function () {
args = _args4.slice(2);
}

job = new Job(task, args, options, this.jobDefaults, this.Promise, NUM_PRIORITIES, DEFAULT_PRIORITY);
job = new Job(task, args, options, this.jobDefaults, this.rejectOnDrop, this.Events, this._states, this.Promise);

this._addToQueue(job);

Expand Down
Loading

0 comments on commit 0e397c1

Please sign in to comment.