Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to delete jobs in a queue by function alone without args #561

Merged
merged 1 commit into from
Apr 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ You can read the API docs for Node Resque @ [node-resque.actionherojs.com](https
## Version Notes

- The version of redis required is >= 2.6.0 as we use lua scripting to create custom atomic operations
- ‼️ Version 6+ of Node Resque uses TypeScript. We will still include JavaScript transpiled code in NPM releases, but they will be generated from the TypeScript source. Functinality between node-resque v5 and v6 should be the same.
- ‼️ Version 6+ of Node Resque uses TypeScript. We will still include JavaScript transpiled code in NPM releases, but they will be generated from the TypeScript source. Functionality between node-resque v5 and v6 should be the same.
- ‼️ Version 5+ of Node Resque uses async/await. There is no upgrade path from previous versions. Node v8.0.0+ is required.

## Usage
Expand Down
26 changes: 24 additions & 2 deletions __tests__/core/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ describe("queue", () => {
expect(timestamps.length).toBe(0);
});

test("can deleted an enqued job", async () => {
test("can delete an enqueued job", async () => {
await queue.enqueue(specHelper.queue, "someJob", [1, 2, 3]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
Expand All @@ -221,7 +221,29 @@ describe("queue", () => {
expect(lengthAgain).toBe(0);
});

test("can deleted a delayed job", async () => {
test("can delete all enqueued jobs of a particular function/class", async () => {
await queue.enqueue(specHelper.queue, "someJob1", [1, 2, 3]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);

await queue.enqueue(specHelper.queue, "someJob1", [1, 2, 3]);
const lengthAgain = await queue.length(specHelper.queue);
expect(lengthAgain).toBe(2);

await queue.enqueue(specHelper.queue, "someJob2", [1, 2, 3]);
const lengthOnceAgain = await queue.length(specHelper.queue);
expect(lengthOnceAgain).toBe(3);

const countDeleted = await queue.delByFunction(
specHelper.queue,
"someJob1"
);
const lengthFinally = await queue.length(specHelper.queue);
expect(countDeleted).toBe(2);
expect(lengthFinally).toBe(1);
});

test("can delete a delayed job", async () => {
await queue.enqueueAt(10000, specHelper.queue, "someJob", [1, 2, 3]);
const timestamps = await queue.delDelayed(specHelper.queue, "someJob", [
1,
Expand Down
37 changes: 36 additions & 1 deletion src/core/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export class Queue extends EventEmitter {
}

/**
* - jobs are deleted by those matching a `func` and agument collection within a given queue.
* - jobs are deleted by those matching a `func` and argument collection within a given queue.
* - You might match none, or you might match many.
*/
async del(q: string, func: string, args: Array<any> = [], count: number = 0) {
Expand All @@ -172,6 +172,41 @@ export class Queue extends EventEmitter {
);
}

/**
* delByFunction
* @param q - queue to look in
* @param func - function name to delete any jobs with
* @param start - optional place to start looking in list (default: beginning of list)
* @param stop - optional place to end looking in list (default: end of list)
* @returns number of jobs deleted from queue
*/
async delByFunction(
q: string,
func: string,
start: number = 0,
stop: number = -1
) {
const jobs = await this.connection.redis.lrange(
this.connection.key("queue", q),
start,
stop
);
let numJobsDeleted: number = 0;
for (let i = 0; i < jobs.length; i++) {
const jobEncoded = jobs[i];
const { class: jobFunc } = JSON.parse(jobEncoded);
if (jobFunc === func) {
await this.connection.redis.lrem(
this.connection.key("queue", q),
0,
jobEncoded
);
numJobsDeleted++;
}
}
return numJobsDeleted;
Comment on lines +189 to +207
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried here about the delay between reading and deleting. If you find your job in position 4 in the range, and then delete the job in position it might have been moved to a new position, or out of the queue entirely but the time you call lrem.

The good news is that lrem compares the contents of jobEncoded, so it won't delete the wrong job by accident. The bad news is that you might miss new jobs that have been added as the queue is changing under you. Think that will be a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I think might help a little bit with this, although will absolutely not resolve entirely would be to use pipelining to batch the lrem commands. In saying that though jobs are/can be sill being worked as the pipeline is being built so the problem doesn't go away entirely.

In my case this is not much of an issue, I simply want to be able to just clear a queue from a particular function/class and the fact that jobs can be worked in the middle isn't a big deal, but I'm not sure if theres another use case where this could be a bigger issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point - unless you bring LUA into the mix, I guess we can't really guarantee that we deleted all the jobs. Could you add a note to that point in the comments? "best-effort" and all that...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge this in and add the note on master - thanks for your contribution!

}

async delDelayed(q: string, func: string, args: Array<any> = []) {
const timestamps = [];
args = arrayify(args);
Expand Down