Skip to content

Commit 16b93ab

Browse files
authored
fix: Stopping a queue should delete all pending messages immediately #146 (#147)
1 parent 0e511a4 commit 16b93ab

File tree

4 files changed

+916
-940
lines changed

4 files changed

+916
-940
lines changed

package.json

+10-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "squiss-ts",
3-
"version": "5.2.1",
3+
"version": "5.2.2",
44
"description": "High-volume SQS poller",
55
"main": "dist/index.js",
66
"types": "dist/index.d.ts",
@@ -48,9 +48,9 @@
4848
},
4949
"homepage": "https://github.com/PruvoNet/squiss-ts#readme",
5050
"dependencies": {
51-
"@aws-sdk/client-s3": "^3.362.0",
52-
"@aws-sdk/client-sqs": "^3.360.0",
53-
"@aws-sdk/types": "^3.369.0",
51+
"@aws-sdk/client-s3": "^3.385.0",
52+
"@aws-sdk/client-sqs": "^3.385.0",
53+
"@aws-sdk/types": "^3.378.0",
5454
"linked-list": "2.1.0",
5555
"ts-type-guards": "^0.7.0",
5656
"uuid": "^9.0.0"
@@ -60,30 +60,31 @@
6060
"@types/chai": "^4.3.5",
6161
"@types/chai-as-promised": "^7.1.5",
6262
"@types/mocha": "^10.0.1",
63-
"@types/node": "^14.18.48",
63+
"@types/node": "^16.18.39",
6464
"@types/proxyquire": "^1.3.28",
65-
"@types/uuid": "^9.0.1",
65+
"@types/uuid": "^9.0.2",
6666
"chai": "^4.3.7",
6767
"chai-as-promised": "^7.1.1",
6868
"delay": "5.0.0",
6969
"dirty-chai": "^2.0.1",
7070
"mocha": "^10.2.0",
7171
"nyc": "^15.1.0",
7272
"proxyquire": "^2.1.3",
73-
"sinon": "^15.1.0",
73+
"sinon": "^15.2.0",
7474
"sinon-chai": "^3.7.0",
7575
"source-map-support": "^0.5.21",
7676
"ts-node": "^10.9.1",
7777
"tslint": "^6.1.3",
78-
"typescript": "^5.1.3"
78+
"typescript": "^5.1.6"
7979
},
8080
"nyc": {
8181
"extends": "@istanbuljs/nyc-config-typescript",
8282
"check-coverage": true,
8383
"all": true,
8484
"exclude": [
8585
"src/test/**",
86-
"e2e**"
86+
"e2e**",
87+
"build**"
8788
],
8889
"reporter": [
8990
"html",

src/Squiss.ts

+22-11
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
110110
});
111111
}
112112

113-
public deleteMessage(msg: Message): Promise<void> {
113+
public async deleteMessage(msg: Message): Promise<void> {
114114
if (!msg.raw) {
115-
return Promise.reject(new Error('Squiss.deleteMessage requires a Message object'));
115+
throw new Error('Squiss.deleteMessage requires a Message object');
116116
}
117117
const promise = new Promise<void>((resolve, reject) => {
118118
this._delQueue.set(msg.raw.MessageId!,
@@ -126,11 +126,11 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
126126
clearTimeout(this._delTimer);
127127
this._delTimer = undefined;
128128
}
129-
this._deleteXMessages(this._opts.deleteBatchSize);
129+
await this._deleteXMessages(this._opts.deleteBatchSize);
130130
} else if (!this._delTimer) {
131-
this._delTimer = setTimeout(() => {
131+
this._delTimer = setTimeout(async () => {
132132
this._delTimer = undefined;
133-
this._deleteXMessages();
133+
await this._deleteXMessages();
134134
}, this._opts.deleteWaitMs);
135135
}
136136
return promise;
@@ -312,18 +312,19 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
312312
return this._startPoller();
313313
}
314314

315-
public stop(soft?: boolean, timeout?: number): Promise<boolean> {
315+
public async stop(soft?: boolean, timeout?: number): Promise<boolean> {
316316
if (!soft && this._activeReq) {
317317
this._activeReq.abort();
318318
}
319319
this._running = this._paused = false;
320320
if (!this._inFlight) {
321-
return Promise.resolve(true);
321+
await this._drainDeleteQueue();
322+
return true;
322323
}
323324
let resolved = false;
324325
let timer: any;
325-
return new Promise((resolve) => {
326-
this.on('drained', () => {
326+
const result = await new Promise<boolean>(async (resolve) => {
327+
this.on('drained',() => {
327328
if (!resolved) {
328329
resolved = true;
329330
if (timer) {
@@ -338,6 +339,8 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
338339
resolve(false);
339340
}, timeout) : undefined;
340341
});
342+
await this._drainDeleteQueue();
343+
return result;
341344
}
342345

343346
public getS3(): S3 {
@@ -347,6 +350,14 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
347350
return this._s3;
348351
}
349352

353+
private async _drainDeleteQueue(): Promise<void> {
354+
if (this._delTimer) {
355+
clearTimeout(this._delTimer);
356+
this._delTimer = undefined;
357+
}
358+
await this._deleteXMessages();
359+
}
360+
350361
private _initS3() {
351362
if (this._opts.S3) {
352363
if (typeof this._opts.S3 === 'function') {
@@ -500,15 +511,15 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
500511
})
501512
}
502513

503-
private _deleteXMessages(x?: number) {
514+
private async _deleteXMessages(x?: number): Promise<void> {
504515
const delQueue = this._delQueue;
505516
const iterator = delQueue.entries();
506517
const delBatch = Array.from({length: x || delQueue.size}, function(this: typeof iterator) {
507518
const element = this.next().value;
508519
delQueue.delete(element[0]);
509520
return element[1];
510521
}, iterator);
511-
this._deleteMessages(delBatch);
522+
await this._deleteMessages(delBatch);
512523
}
513524

514525
private _isLargeMessage(message: ISendMessageRequest, minSize?: number): Promise<boolean> {

src/test/src/index.spec.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -387,22 +387,31 @@ describe('index', () => {
387387
});
388388
it('should resolve when queue already drained', () => {
389389
const spy = sinon.spy();
390+
const deleteSpy = sinon.spy();
390391
inst = new SquissPatched({queueUrl: 'foo'} as ISquissOptions);
391-
inst!.sqs = new SQSStub(0, 1000) as any as SQS;
392+
inst!.sqs = new SQSStub(1, 1000) as any as SQS;
392393
inst!.on('aborted', spy);
394+
inst!.on('deleted', deleteSpy);
395+
inst!.on('message', (msg: Message) => {
396+
msg.del();
397+
});
393398
inst!.start();
394399
return wait().then(() => {
395400
spy.should.not.be.called();
401+
deleteSpy.should.not.be.called();
396402
return inst!.stop(false, 1000);
397403
}).then((result: boolean) => {
404+
deleteSpy.should.be.called();
398405
result.should.eq(true);
399406
});
400407
});
401408
it('should resolve when queue drained before timeout', () => {
402409
const spy = sinon.spy();
410+
const deleteSpy = sinon.spy();
403411
inst = new SquissPatched({queueUrl: 'foo'} as ISquissOptions);
404412
inst!.sqs = new SQSStub(1, 1000) as any as SQS;
405413
inst!.on('aborted', spy);
414+
inst!.on('deleted', deleteSpy);
406415
inst!.on('message', (msg: Message) => {
407416
setTimeout(() => {
408417
msg.del();
@@ -411,8 +420,10 @@ describe('index', () => {
411420
inst!.start();
412421
return wait().then(() => {
413422
spy.should.not.be.called();
423+
deleteSpy.should.not.be.called();
414424
return inst!.stop(false, 10000);
415425
}).then((result: boolean) => {
426+
deleteSpy.should.be.called();
416427
result.should.eq(true);
417428
});
418429
});

0 commit comments

Comments
 (0)