Skip to content

Commit

Permalink
Add .sizeBy() method to find queue size by priority (#94)
Browse files Browse the repository at this point in the history
Co-authored-by: Sindre Sorhus <[email protected]>
  • Loading branch information
bobjflong and sindresorhus authored Feb 17, 2020
1 parent c2e768f commit a9f5778
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 9 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"random-int": "^2.0.0",
"time-span": "^3.1.0",
"ts-node": "^8.3.0",
"typescript": "^3.7.2",
"typescript": "3.7.2",
"xo": "^0.25.3"
},
"ava": {
Expand Down
28 changes: 28 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,27 @@ Clear the queue.

Size of the queue.


#### .sizeBy(options)

Size of the queue, filtered by the given options.

For example, this can be used to find the number of items remaining in the queue with a specific priority level.

```js
const queue = new PQueue();

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 0});
queue.add(async () => '🦄', {priority: 1});

console.log(queue.sizeBy({priority: 1}));
//=> 2

console.log(queue.sizeBy({priority: 0}));
//=> 1
```

#### .pending

Number of pending promises.
Expand Down Expand Up @@ -284,15 +305,22 @@ class QueueClass {
constructor() {
this._queue = [];
}

enqueue(run, options) {
this._queue.push(run);
}

dequeue() {
return this._queue.shift();
}

get size() {
return this._queue.length;
}

filter(options) {
return this._queue;
}
}
```

Expand Down
13 changes: 11 additions & 2 deletions source/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import EventEmitter = require('eventemitter3');
import {default as pTimeout, TimeoutError} from 'p-timeout';
import {Queue} from './queue';
import {Queue, RunFunction} from './queue';
import PriorityQueue from './priority-queue';
import {QueueAddOptions, DefaultAddOptions, Options} from './options';

Expand All @@ -17,7 +17,7 @@ const timeoutError = new TimeoutError();
/**
Promise queue with concurrency control.
*/
export default class PQueue<QueueType extends Queue<EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active'> {
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active'> {
private readonly _carryoverConcurrencyCount: boolean;

private readonly _isIntervalIgnored: boolean;
Expand Down Expand Up @@ -344,6 +344,15 @@ export default class PQueue<QueueType extends Queue<EnqueueOptionsType> = Priori
return this._queue.size;
}

/**
Size of the queue, filtered by the given options.
For example, this can be used to find the number of items remaining in the queue with a specific priority level.
*/
sizeBy(options: Partial<EnqueueOptionsType>): number {
return this._queue.filter(options).length;
}

/**
Number of pending promises.
*/
Expand Down
4 changes: 2 additions & 2 deletions source/options.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {Queue} from './queue';
import {Queue, RunFunction} from './queue';

export interface QueueAddOptions {
readonly [key: string]: unknown;
}

export interface Options<QueueType extends Queue<QueueOptions>, QueueOptions extends QueueAddOptions> {
export interface Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> {
/**
Concurrency limit.
Expand Down
6 changes: 5 additions & 1 deletion source/priority-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export interface PriorityQueueOptions extends QueueAddOptions {
priority?: number;
}

export default class PriorityQueue implements Queue<PriorityQueueOptions> {
export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOptions> {
private readonly _queue: Array<PriorityQueueOptions & {run: RunFunction}> = [];

enqueue(run: RunFunction, options?: Partial<PriorityQueueOptions>): void {
Expand Down Expand Up @@ -34,6 +34,10 @@ export default class PriorityQueue implements Queue<PriorityQueueOptions> {
return item && item.run;
}

filter(options: Partial<PriorityQueueOptions>): RunFunction[] {
return this._queue.filter(element => element.priority === options.priority).map(element => element.run);
}

get size(): number {
return this._queue.length;
}
Expand Down
7 changes: 4 additions & 3 deletions source/queue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export type RunFunction = () => Promise<unknown>;

export interface Queue<Options> {
export interface Queue<Element, Options> {
size: number;
dequeue(): RunFunction | undefined;
enqueue(run: RunFunction, options?: Partial<Options>): void;
filter(options: Partial<Options>): Element[];
dequeue(): Element | undefined;
enqueue(run: Element, options?: Partial<Options>): void;
}
14 changes: 14 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ test('.add() - priority', async t => {
t.deepEqual(result, [1, 3, 1, 2, 0, 0]);
});

test('.sizeBy() - priority', async t => {
const queue = new PQueue();
queue.pause();
queue.add(async () => 0, {priority: 1});
queue.add(async () => 0, {priority: 0});
queue.add(async () => 0, {priority: 1});
t.is(queue.sizeBy({priority: 1}), 2);
t.is(queue.sizeBy({priority: 0}), 1);
queue.clear();
await queue.onEmpty();
t.is(queue.sizeBy({priority: 1}), 0);
t.is(queue.sizeBy({priority: 0}), 0);
});

test('.add() - timeout without throwing', async t => {
const result: string[] = [];
const queue = new PQueue({timeout: 300, throwOnTimeout: false});
Expand Down

0 comments on commit a9f5778

Please sign in to comment.