Skip to content

Commit 64693f2

Browse files
committed
fix(jetstream): more resilient ordered consumers and protocol message handling
Signed-off-by: Alberto Ricart <[email protected]>
1 parent 539a717 commit 64693f2

20 files changed

+61
-46
lines changed

Diff for: core/deno.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,4 @@
3838
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
3939
"@nats-io/nuid": "jsr:@nats-io/[email protected]"
4040
}
41-
}
41+
}

Diff for: core/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@
4343
"typedoc": "^0.26.10",
4444
"typescript": "^5.5.4"
4545
}
46-
}
46+
}

Diff for: jetstream/deno.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/jetstream",
3-
"version": "3.0.0-21",
3+
"version": "3.0.0-22",
44
"exports": {
55
".": "./src/mod.ts",
66
"./internal": "./src/internal_mod.ts"
@@ -35,4 +35,4 @@
3535
"imports": {
3636
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34"
3737
}
38-
}
38+
}

Diff for: jetstream/import_map.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
"test_helpers": "../test_helpers/mod.ts",
88
"@std/io": "jsr:@std/[email protected]"
99
}
10-
}
10+
}

Diff for: jetstream/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/jetstream",
3-
"version": "3.0.0-21",
3+
"version": "3.0.0-22",
44
"files": [
55
"lib/",
66
"LICENSE",
@@ -42,4 +42,4 @@
4242
"typedoc": "^0.26.10",
4343
"typescript": "^5.6.3"
4444
}
45-
}
45+
}

Diff for: jetstream/src/consumer.ts

+11-8
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
166166
}
167167
this.monitor?.work();
168168

169-
const isProtocol = msg.subject === this.inbox;
169+
const isProtocol = this.consumer.ordered
170+
? msg.subject.indexOf(this?.inboxPrefix!) === 0
171+
: msg.subject === this.inbox;
172+
170173
if (isProtocol) {
174+
if (msg.subject !== (this.sub as SubscriptionImpl).subject) {
175+
// this is a stale message - was not sent to the current inbox
176+
return;
177+
}
171178
const status = new JetStreamStatus(msg);
172179

173180
if (status.isIdleHeartbeat()) {
@@ -178,7 +185,6 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
178185
const description = status.description;
179186

180187
const { msgsLeft, bytesLeft } = status.parseDiscard();
181-
console.log("pending", msgsLeft, bytesLeft);
182188
if ((msgsLeft && msgsLeft > 0) || (bytesLeft && bytesLeft > 0)) {
183189
this.pending.msgs -= msgsLeft;
184190
this.pending.bytes -= bytesLeft;
@@ -842,15 +848,12 @@ export class PullConsumerImpl implements Consumer {
842848
return ci;
843849
}
844850

845-
info(cached = false): Promise<ConsumerInfo> {
851+
async info(cached = false): Promise<ConsumerInfo> {
846852
if (cached) {
847853
return Promise.resolve(this._info);
848854
}
849855
const { stream_name, name } = this._info;
850-
return this.api.info(stream_name, name)
851-
.then((ci) => {
852-
this._info = ci;
853-
return this._info;
854-
});
856+
this._info = await this.api.info(stream_name, name);
857+
return this._info;
855858
}
856859
}

Diff for: jetstream/src/jsbaseclient_api.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ export class BaseApiClientImpl {
119119
} catch (err) {
120120
const re = err instanceof RequestError ? err as RequestError : null;
121121
if (
122-
err instanceof errors.TimeoutError ||
123-
re?.isNoResponders() && i + 1 < retries
122+
(err instanceof errors.TimeoutError || re?.isNoResponders()) &&
123+
i + 1 < retries
124124
) {
125125
await delay(bo.backoff(i));
126126
} else {

Diff for: jetstream/src/jserrors.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ export class JetStreamStatus {
6060

6161
debug() {
6262
console.log({
63-
message: this.description,
63+
subject: this.msg.subject,
64+
reply: this.msg.reply,
65+
description: this.description,
6466
status: this.code,
6567
headers: this.msg.headers,
6668
});

Diff for: jetstream/src/pushconsumer.ts

+19-9
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import type {
4646
QueuedIterator,
4747
Status,
4848
Subscription,
49+
SubscriptionImpl,
4950
} from "@nats-io/nats-core/internal";
5051
import { JetStreamStatus } from "./jserrors.ts";
5152

@@ -281,9 +282,26 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
281282
}
282283
this.monitor?.work();
283284

284-
const isProtocol = msg.subject === subject;
285+
// need to make sure to catch all protocol messages even
286+
const isProtocol = this.ordered
287+
? msg.subject.indexOf(this?.deliverPrefix!) === 0
288+
: msg.subject === subject;
289+
285290
if (isProtocol) {
291+
if (msg.subject !== (this.sub as SubscriptionImpl).subject) {
292+
// this is a stale message - was not sent to the current inbox
293+
return;
294+
}
295+
286296
const status = new JetStreamStatus(msg);
297+
if (status.isFlowControlRequest()) {
298+
this._push(() => {
299+
msg.respond();
300+
this.notify(ConsumerDebugEvents.FlowControl, null);
301+
});
302+
return;
303+
}
304+
287305
if (status.isIdleHeartbeat()) {
288306
const natsLastConsumer = msg.headers?.get("Nats-Last-Consumer");
289307
const natsLastStream = msg.headers?.get("Nats-Last-Stream");
@@ -293,14 +311,6 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
293311
});
294312
return;
295313
}
296-
if (status.isFlowControlRequest()) {
297-
status.debug();
298-
this._push(() => {
299-
msg.respond();
300-
this.notify(ConsumerDebugEvents.FlowControl, null);
301-
});
302-
return;
303-
}
304314

305315
const code = status.code;
306316
const description = status.description;

Diff for: kv/deno.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@
3434
},
3535
"imports": {
3636
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
37-
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21"
37+
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22"
3838
}
39-
}
39+
}

Diff for: kv/import_map.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
"imports": {
33
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
44
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal",
5-
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21",
6-
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal",
5+
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22",
6+
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal",
77
"test_helpers": "../test_helpers/mod.ts",
88
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
99
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
1010
"@std/io": "jsr:@std/[email protected]"
1111
}
12-
}
12+
}

Diff for: kv/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"description": "kv library - this library implements all the base functionality for NATS KV javascript clients",
3636
"dependencies": {
37-
"@nats-io/jetstream": "3.0.0-21",
37+
"@nats-io/jetstream": "3.0.0-22",
3838
"@nats-io/nats-core": "3.0.0-34"
3939
},
4040
"devDependencies": {
@@ -43,4 +43,4 @@
4343
"typedoc": "^0.26.10",
4444
"typescript": "^5.6.3"
4545
}
46-
}
46+
}

Diff for: obj/deno.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@
3434
},
3535
"imports": {
3636
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
37-
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21"
37+
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22"
3838
}
39-
}
39+
}

Diff for: obj/import_map.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
"imports": {
33
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34",
44
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0-34/internal",
5-
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-21",
6-
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-21/internal",
5+
"@nats-io/jetstream": "jsr:@nats-io/jetstream@~3.0.0-22",
6+
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0-22/internal",
77
"test_helpers": "../test_helpers/mod.ts",
88
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
99
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
1010
"@std/io": "jsr:@std/[email protected]"
1111
}
12-
}
12+
}

Diff for: obj/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
},
3535
"description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients",
3636
"dependencies": {
37-
"@nats-io/jetstream": "3.0.0-21",
37+
"@nats-io/jetstream": "3.0.0-22",
3838
"@nats-io/nats-core": "3.0.0-34"
3939
},
4040
"devDependencies": {
@@ -43,4 +43,4 @@
4343
"typedoc": "^0.26.10",
4444
"typescript": "^5.6.3"
4545
}
46-
}
46+
}

Diff for: services/deno.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,4 @@
3535
"imports": {
3636
"@nats-io/nats-core": "jsr:@nats-io/nats-core@~3.0.0-34"
3737
}
38-
}
38+
}

Diff for: services/import_map.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
88
"@std/io": "jsr:@std/[email protected]"
99
}
10-
}
10+
}

Diff for: services/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@
4242
"typedoc": "^0.26.10",
4343
"typescript": "^5.6.3"
4444
}
45-
}
45+
}

Diff for: transport-deno/deno.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@
2424
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
2525
"@nats-io/nuid": "jsr:@nats-io/[email protected]"
2626
}
27-
}
27+
}

Diff for: transport-node/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
"nats-jwt": "^0.0.9",
6565
"shx": "^0.3.3",
6666
"typescript": "5.6.3",
67-
"@nats-io/jetstream": "3.0.0-21",
67+
"@nats-io/jetstream": "3.0.0-22",
6868
"@nats-io/kv": "3.0.0-16",
6969
"@nats-io/obj": "3.0.0-17"
7070
}

0 commit comments

Comments
 (0)