Skip to content

Commit

Permalink
Add add and next events (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
kheyse-werk authored Jul 19, 2020
1 parent 1e9dcf8 commit edd2e8a
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
34 changes: 34 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,40 @@ await queue.add(() => delay(600));

The `idle` event is emitted every time the queue reaches an idle state. On the other hand, the promise the `onIdle()` function returns resolves once the queue becomes idle instead of every time the queue is idle.

#### add

Emitted every time the add method is called and the number of pending or queued tasks is increased.

#### next

Emitted every time a task is completed and the number of pending or queued tasks is decreased.

```js
const delay = require('delay');
const {default: PQueue} = require('p-queue');

const queue = new PQueue();

queue.on('add', () => {
console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`);
});
queue.on('next', () => {
console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`);
});

const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));

await job1;
await job2;
// => 'Task is added. Size: 0 Pending: 1'
// => 'Task is added. Size: 0 Pending: 2'

await queue.add(() => delay(600));
// => 'Task is completed. Size: 0 Pending: 1'
// => 'Task is completed. Size: 0 Pending: 0'
```

## Advanced example

A more advanced example to help you understand the flow.
Expand Down
4 changes: 3 additions & 1 deletion source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const timeoutError = new TimeoutError();
/**
Promise queue with concurrency control.
*/
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle'> {
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next'> {
private readonly _carryoverConcurrencyCount: boolean;

private readonly _isIntervalIgnored: boolean;
Expand Down Expand Up @@ -99,6 +99,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
private _next(): void {
this._pendingCount--;
this._tryToStartAnother();
this.emit('next');
}

private _resolvePromises(): void {
Expand Down Expand Up @@ -255,6 +256,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT

this._queue.enqueue(run, options);
this._tryToStartAnother();
this.emit('add');
});
}

Expand Down
86 changes: 86 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,92 @@ test('should emit idle event when idle', async t => {
t.is(queue.size, 0);
t.is(timesCalled, 2);
});
test('should emit add event when adding task', async t => {
const queue = new PQueue({concurrency: 1});

let timesCalled = 0;
queue.on('add', () => {
timesCalled++;
});

const job1 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 1);

const job2 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 1);
t.is(timesCalled, 2);

await job1;

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 2);

await job2;

t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(timesCalled, 2);

const job3 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 3);

await job3;
t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(timesCalled, 3);
});
test('should emit next event when completing task', async t => {
const queue = new PQueue({concurrency: 1});

let timesCalled = 0;
queue.on('next', () => {
timesCalled++;
});

const job1 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 0);

const job2 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 1);
t.is(timesCalled, 0);

await job1;

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 1);

await job2;

t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(timesCalled, 2);

const job3 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 2);

await job3;
t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(timesCalled, 3);
});

test('should verify timeout overrides passed to add', async t => {
const queue = new PQueue({timeout: 200, throwOnTimeout: true});
Expand Down

0 comments on commit edd2e8a

Please sign in to comment.