Skip to content

Commit

Permalink
Add .setPriority() for updating priority of a queued promise functi…
Browse files Browse the repository at this point in the history
…on (#209)

Co-authored-by: Sindre Sorhus <[email protected]>
  • Loading branch information
RaishavHanspal and sindresorhus authored Jan 22, 2025
1 parent cbdbbb7 commit 6e5cbc9
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 2 deletions.
44 changes: 44 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ Default: `0`

Priority of operation. Operations with greater priority will be scheduled first.

##### id

Type `string`

Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`.

##### signal

[`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an [error](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/reason). If the operation is already running, the signal will need to be handled by the operation itself.
Expand Down Expand Up @@ -238,6 +244,44 @@ console.log(queue.sizeBy({priority: 0}));
//=> 1
```

#### .setPriority(id, priority)

Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.

For example, this can be used to prioritize a promise function to run earlier.

```js
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 0, id: '🦀'});
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 1});

queue.setPriority('🦀', 2);
```

In this case, the promise function with `id: '🦀'` runs second.

You can also deprioritize a promise function to delay its execution:

```js
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 1, id: '🦀'});
queue.add(async () => '🦄');
queue.add(async () => '🦄', {priority: 0});

queue.setPriority('🦀', -1);
```

Here, the promise function with `id: '🦀'` executes last.

#### .pending

Number of running items (no longer in the queue).
Expand Down
46 changes: 46 additions & 0 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT

readonly #throwOnTimeout: boolean;

// Use to assign a unique identifier to a promise function, if not explicitly specified
#idAssigner = 1n;

/**
Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
Expand Down Expand Up @@ -228,12 +231,55 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
});
}

/**
Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.
For example, this can be used to prioritize a promise function to run earlier.
```js
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 1});
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 0, id: '🦀'});
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 1});
queue.setPriority('🦀', 2);
```
In this case, the promise function with `id: '🦀'` runs second.
You can also deprioritize a promise function to delay its execution:
```js
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 1});
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 1, id: '🦀'});
queue.add(async () => '🦄');
queue.add(async () => '🦄', {priority: 0});
queue.setPriority('🦀', -1);
```
Here, the promise function with `id: '🦀'` executes last.
*/
setPriority(id: string, priority: number) {
this.#queue.setPriority(id, priority);
}

/**
Adds a sync or async task to the queue. Always returns a promise.
*/
async add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
async add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
// In case `id` is not defined.
options.id ??= (this.#idAssigner++).toString();

options = {
timeout: this.timeout,
throwOnTimeout: this.#throwOnTimeout,
Expand Down
5 changes: 5 additions & 0 deletions source/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ export type QueueAddOptions = {
@default 0
*/
readonly priority?: number;

/**
Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`.
*/
id?: string;
} & TaskOptions & TimeoutOptions;

export type TaskOptions = {
Expand Down
13 changes: 12 additions & 1 deletion source/priority-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp

const element = {
priority: options.priority,
id: options.id,
run,
};

if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) {
if (this.size === 0 || this.#queue[this.size - 1]!.priority! >= options.priority!) {
this.#queue.push(element);
return;
}
Expand All @@ -32,6 +33,16 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
this.#queue.splice(index, 0, element);
}

setPriority(id: string, priority: number) {
const index: number = this.#queue.findIndex((element: Readonly<PriorityQueueOptions>) => element.id === id);
if (index === -1) {
throw new ReferenceError(`No promise function with the id "${id}" exists in the queue.`);
}

const [item] = this.#queue.splice(index, 1);
this.enqueue(item!.run, {priority, id});
}

dequeue(): RunFunction | undefined {
const item = this.#queue.shift();
return item?.run;
Expand Down
1 change: 1 addition & 0 deletions source/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ export type Queue<Element, Options> = {
filter: (options: Readonly<Partial<Options>>) => Element[];
dequeue: () => Element | undefined;
enqueue: (run: Element, options?: Partial<Options>) => void;
setPriority: (id: string, priority: number) => void;
};
163 changes: 162 additions & 1 deletion test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import inRange from 'in-range';
import timeSpan from 'time-span';
import randomInt from 'random-int';
import pDefer from 'p-defer';
import PQueue, {AbortError} from '../source/index.js';
import PQueue from '../source/index.js';

const fixture = Symbol('fixture');

Expand Down Expand Up @@ -1134,3 +1134,164 @@ test('aborting multiple jobs at the same time', async t => {
await t.throwsAsync(task2, {instanceOf: DOMException});
t.like(queue, {size: 0, pending: 0});
});

test('.setPriority() - execute a promise before planned', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 1});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.setPriority('🐢', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🐢', '🦆']);
});

test('.setPriority() - execute a promise after planned', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 1});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.setPriority('🐢', -1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🦆', '🦆', '🦆', '🐢']);
});

test('.setPriority() - execute a promise before planned - concurrency 2', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 2});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('⚡️');
}, {id: '⚡️'});
queue.setPriority('⚡️', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '⚡️', '🐢']);
});

test('.setPriority() - execute a promise before planned - concurrency 3', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 3});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('⚡️');
}, {id: '⚡️'});
queue.add(async () => {
await delay(400);
result.push('🦀');
}, {id: '🦀'});
queue.setPriority('🦀', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']);
});

test('.setPriority() - execute a multiple promise before planned, with variable priority', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 2});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('⚡️');
}, {id: '⚡️'});
queue.add(async () => {
await delay(400);
result.push('🦀');
}, {id: '🦀'});
queue.setPriority('⚡️', 1);
queue.setPriority('🦀', 2);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🦀', '⚡️', '🐢']);
});

test('.setPriority() - execute a promise before planned - concurrency 3 and unspecified `id`', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 3});
queue.add(async () => {
await delay(400);
result.push('🐌');
});
queue.add(async () => {
await delay(400);
result.push('🦆');
});
queue.add(async () => {
await delay(400);
result.push('🐢');
});
queue.add(async () => {
await delay(400);
result.push('⚡️');
});
queue.add(async () => {
await delay(400);
result.push('🦀');
});
queue.setPriority('5', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']);
});

0 comments on commit 6e5cbc9

Please sign in to comment.