From edd2e8a8661d47039bb81c3d264b87cedd8de18b Mon Sep 17 00:00:00 2001 From: Karel Heyse <35029144+kheyse-oqton@users.noreply.github.com> Date: Sun, 19 Jul 2020 22:40:23 +0200 Subject: [PATCH] Add `add` and `next` events (#111) --- readme.md | 34 +++++++++++++++++++ source/index.ts | 4 ++- test/test.ts | 86 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) diff --git a/readme.md b/readme.md index ab87528..332f0f2 100644 --- a/readme.md +++ b/readme.md @@ -245,6 +245,40 @@ await queue.add(() => delay(600)); The `idle` event is emitted every time the queue reaches an idle state. On the other hand, the promise the `onIdle()` function returns resolves once the queue becomes idle instead of every time the queue is idle. +#### add + +Emitted every time the add method is called and the number of pending or queued tasks is increased. + +#### next + +Emitted every time a task is completed and the number of pending or queued tasks is decreased. + +```js +const delay = require('delay'); +const {default: PQueue} = require('p-queue'); + +const queue = new PQueue(); + +queue.on('add', () => { + console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`); +}); +queue.on('next', () => { + console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`); +}); + +const job1 = queue.add(() => delay(2000)); +const job2 = queue.add(() => delay(500)); + +await job1; +await job2; +// => 'Task is added. Size: 0 Pending: 1' +// => 'Task is added. Size: 0 Pending: 2' + +await queue.add(() => delay(600)); +// => 'Task is completed. Size: 0 Pending: 1' +// => 'Task is completed. Size: 0 Pending: 0' +``` + ## Advanced example A more advanced example to help you understand the flow. diff --git a/source/index.ts b/source/index.ts index fc3e2f5..c5bd847 100644 --- a/source/index.ts +++ b/source/index.ts @@ -18,7 +18,7 @@ const timeoutError = new TimeoutError(); /** Promise queue with concurrency control. */ -export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle'> { +export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next'> { private readonly _carryoverConcurrencyCount: boolean; private readonly _isIntervalIgnored: boolean; @@ -99,6 +99,7 @@ export default class PQueue { t.is(queue.size, 0); t.is(timesCalled, 2); }); +test('should emit add event when adding task', async t => { + const queue = new PQueue({concurrency: 1}); + + let timesCalled = 0; + queue.on('add', () => { + timesCalled++; + }); + + const job1 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(timesCalled, 1); + + const job2 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 1); + t.is(timesCalled, 2); + + await job1; + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(timesCalled, 2); + + await job2; + + t.is(queue.pending, 0); + t.is(queue.size, 0); + t.is(timesCalled, 2); + + const job3 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(timesCalled, 3); + + await job3; + t.is(queue.pending, 0); + t.is(queue.size, 0); + t.is(timesCalled, 3); +}); +test('should emit next event when completing task', async t => { + const queue = new PQueue({concurrency: 1}); + + let timesCalled = 0; + queue.on('next', () => { + timesCalled++; + }); + + const job1 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(timesCalled, 0); + + const job2 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 1); + t.is(timesCalled, 0); + + await job1; + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(timesCalled, 1); + + await job2; + + t.is(queue.pending, 0); + t.is(queue.size, 0); + t.is(timesCalled, 2); + + const job3 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(timesCalled, 2); + + await job3; + t.is(queue.pending, 0); + t.is(queue.size, 0); + t.is(timesCalled, 3); +}); test('should verify timeout overrides passed to add', async t => { const queue = new PQueue({timeout: 200, throwOnTimeout: true});