From 121dc833c58911b34bf02659a8328e6e61f6dd2e Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 7 Jul 2022 10:58:56 -0500 Subject: [PATCH] [FEAT] added optional heartbeat checks to js#fetch() [FEAT] added optional heartbeat checks to js#subcribe() --- nats-base-client/error.ts | 1 + nats-base-client/idleheartbeat.ts | 139 ++++++++++++++++++++++++++++ nats-base-client/jsclient.ts | 123 +++++++++++++++++++++++- nats-base-client/jsutil.ts | 32 ++++++- nats-base-client/queued_iterator.ts | 4 +- nats-base-client/types.ts | 1 + tests/idleheartbeats_test.ts | 109 ++++++++++++++++++++++ tests/jetstream_test.ts | 134 +++++++++++++++++++++++++++ 8 files changed, 537 insertions(+), 6 deletions(-) create mode 100644 nats-base-client/idleheartbeat.ts create mode 100644 tests/idleheartbeats_test.ts diff --git a/nats-base-client/error.ts b/nats-base-client/error.ts index 9cf192ac..b405ae02 100644 --- a/nats-base-client/error.ts +++ b/nats-base-client/error.ts @@ -52,6 +52,7 @@ export enum ErrorCode { JetStream409MaxAckPendingExceeded = "409", JetStream409 = "409", JetStreamNotEnabled = "503", + JetStreamIdleHeartBeat = "IDLE_HEARTBEAT", // emitted by the server AuthorizationViolation = "AUTHORIZATION_VIOLATION", diff --git a/nats-base-client/idleheartbeat.ts b/nats-base-client/idleheartbeat.ts new file mode 100644 index 00000000..b2fa5d71 --- /dev/null +++ b/nats-base-client/idleheartbeat.ts @@ -0,0 +1,139 @@ +/* + * Copyright 2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Called with the number of missed heartbeats. + * If the function returns true, the monitor will cancel monitoring. + */ +export type IdleHeartbeatFn = (n: number) => boolean; + +/** + * IdleHeartbeatOptions + */ +export type IdleHeartbeatOptions = { + /** + * @field maxOut - optional maximum number of missed heartbeats before notifying (default is 2) + */ + maxOut: number; + /** + * @field cancelAfter - optional timer to auto cancel monitoring in millis + */ + cancelAfter: number; +}; + +export class IdleHeartbeat { + interval: number; + maxOut: number; + cancelAfter: number; + timer?: number; + autoCancelTimer?: number; + last!: number; + missed: number; + count: number; + callback: IdleHeartbeatFn; + + /** + * Constructor + * @param interval in millis to check + * @param cb a callback to report when heartbeats are missed + * @param opts monitor options @see IdleHeartbeatOptions + */ + constructor( + interval: number, + cb: IdleHeartbeatFn, + opts: Partial = { maxOut: 2 }, + ) { + this.interval = interval; + this.maxOut = opts?.maxOut || 2; + this.cancelAfter = opts?.cancelAfter || 0; + this.last = Date.now(); + this.missed = 0; + this.count = 0; + this.callback = cb; + + this._schedule(); + } + + /** + * cancel monitoring + */ + cancel() { + if (this.autoCancelTimer) { + clearTimeout(this.autoCancelTimer); + } + if (this.timer) { + clearInterval(this.timer); + } + this.timer = 0; + this.autoCancelTimer = 0; + } + + /** + * work signals that there was work performed + */ + work() { + this.last = Date.now(); + this.missed = 0; + } + + /** + * internal api to change the interval, cancelAfter and maxOut + * @param interval + * @param cancelAfter + * @param maxOut + */ + _change(interval: number, cancelAfter = 0, maxOut = 2) { + this.interval = interval; + this.maxOut = maxOut; + this.cancelAfter = cancelAfter; + this.restart(); + } + + /** + * cancels and restarts the monitoring + */ + restart() { + this.cancel(); + this._schedule(); + } + + /** + * internal api called to start monitoring + */ + _schedule() { + if (this.cancelAfter > 0) { + // @ts-ignore: in node is not a number - we treat this opaquely + this.autoCancelTimer = setTimeout(() => { + this.cancel(); + }, this.cancelAfter); + } + // @ts-ignore: in node is not a number - we treat this opaquely + this.timer = setInterval(() => { + this.count++; + if (Date.now() - this.interval > this.last) { + this.missed++; + } + if (this.missed >= this.maxOut) { + try { + if (this.callback(this.missed) === true) { + this.cancel(); + } + } catch (err) { + console.log(err); + } + } + }, this.interval); + } +} diff --git a/nats-base-client/jsclient.ts b/nats-base-client/jsclient.ts index 6349a042..dbfe20bf 100644 --- a/nats-base-client/jsclient.ts +++ b/nats-base-client/jsclient.ts @@ -53,7 +53,10 @@ import { isFlowControlMsg, isHeartbeatMsg, isTerminal409, + Js409Errors, + millis, nanos, + newJsErrorMsg, validateDurableName, validateStreamName, } from "./jsutil.ts"; @@ -80,6 +83,7 @@ import { Bucket } from "./kv.ts"; import { NatsConnectionImpl } from "./nats.ts"; import { Feature } from "./semver.ts"; import { ObjectStoreImpl } from "./objectstore.ts"; +import { IdleHeartbeat } from "./idleheartbeat.ts"; export interface JetStreamSubscriptionInfoable { info: JetStreamSubscriptionInfo | null; @@ -243,6 +247,7 @@ export class JetStreamClientImpl extends BaseApiClient const trackBytes = (opts.max_bytes ?? 0) > 0; let receivedBytes = 0; const max_bytes = trackBytes ? opts.max_bytes! : 0; + let monitor: IdleHeartbeat | null = null; const args: Partial = {}; args.batch = opts.batch || 1; @@ -266,10 +271,27 @@ export class JetStreamClientImpl extends BaseApiClient if (expires === 0 && args.no_wait === false) { throw new Error("expires or no_wait is required"); } + const hb = opts.idle_heartbeat || 0; + if (hb) { + args.idle_heartbeat = nanos(hb); + //@ts-ignore: for testing + if (opts.delay_heartbeat === true) { + //@ts-ignore: test option + args.idle_heartbeat = nanos(hb * 4); + } + } const qi = new QueuedIteratorImpl(); const wants = args.batch; let received = 0; + qi.protocolFilterFn = (jm, _ingest = false): boolean => { + const jsmi = jm as JsMsgImpl; + if (isHeartbeatMsg(jsmi.msg)) { + monitor?.work(); + return false; + } + return true; + }; // FIXME: this looks weird, we want to stop the iterator // but doing it from a dispatchedFn... qi.dispatchedFn = (m: JsMsg | null) => { @@ -311,6 +333,8 @@ export class JetStreamClientImpl extends BaseApiClient qi.stop(err); } } else { + // if we are doing heartbeats, message resets + monitor?.work(); qi.received++; qi.push(toJsMsg(msg)); } @@ -327,16 +351,40 @@ export class JetStreamClientImpl extends BaseApiClient sub.drain(); timer = null; } + if (monitor) { + monitor.cancel(); + } }); } (async () => { + try { + if (hb) { + monitor = new IdleHeartbeat(hb, (v: number): boolean => { + //@ts-ignore: pushing a fn + qi.push(() => { + // this will terminate the iterator + qi.err = new NatsError( + `${Js409Errors.IdleHeartbeatMissed}: ${v}`, + ErrorCode.JetStreamIdleHeartBeat, + ); + }); + return true; + }); + } + } catch (_err) { + // ignore it + } + // close the iterator if the connection or subscription closes unexpectedly await (sub as SubscriptionImpl).closed; if (timer !== null) { timer.cancel(); timer = null; } + if (monitor) { + monitor.cancel(); + } qi.stop(); })().catch(); @@ -413,6 +461,9 @@ export class JetStreamClientImpl extends BaseApiClient sub.unsubscribe(); throw err; } + + sub._maybeSetupHbMonitoring(); + return sub; } @@ -594,10 +645,14 @@ export class JetStreamClientImpl extends BaseApiClient return (jm: JsMsg | null, ctx?: unknown): IngestionFilterFnResult => { // ctx is expected to be the iterator (the JetstreamSubscriptionImpl) const jsub = ctx as JetStreamSubscriptionImpl; + // this shouldn't happen if (!jm) return { ingest: false, protocol: false }; const jmi = jm as JsMsgImpl; + if (!checkJsError(jmi.msg)) { + jsub.monitor?.work(); + } if (isHeartbeatMsg(jmi.msg)) { const ingest = ordered ? jsub._checkHbOrderConsumer(jmi.msg) : true; if (!ordered) { @@ -614,9 +669,10 @@ export class JetStreamClientImpl extends BaseApiClient } } -class JetStreamSubscriptionImpl extends TypedSubscription +export class JetStreamSubscriptionImpl extends TypedSubscription implements JetStreamSubscriptionInfoable, Destroyable, ConsumerInfoable { js: BaseApiClient; + monitor: IdleHeartbeat | null; constructor( js: BaseApiClient, @@ -625,6 +681,13 @@ class JetStreamSubscriptionImpl extends TypedSubscription ) { super(js.nc, subject, opts); this.js = js; + this.monitor = null; + + this.sub.closed.then(() => { + if (this.monitor) { + this.monitor.cancel(); + } + }); } set info(info: JetStreamSubscriptionInfo | null) { @@ -667,6 +730,36 @@ class JetStreamSubscriptionImpl extends TypedSubscription }); } + // this is called by push subscriptions, to initialize the monitoring + // if configured on the consumer + _maybeSetupHbMonitoring() { + const ns = this.info?.config?.idle_heartbeat || 0; + if (ns) { + this._setupHbMonitoring(millis(ns)); + } + } + + _setupHbMonitoring(millis: number, cancelAfter = 0) { + const opts = { cancelAfter: 0, maxOut: 2 }; + if (cancelAfter) { + opts.cancelAfter = cancelAfter; + } + const sub = this.sub as SubscriptionImpl; + const handler = (v: number): boolean => { + const msg = newJsErrorMsg( + 409, + `${Js409Errors.IdleHeartbeatMissed}: ${v}`, + this.sub.subject, + ); + this.sub.callback(null, msg); + // if we are a handler, we'll continue reporting + // iterators will stop + return !sub.noIterator; + }; + // this only applies for push subscriptions + this.monitor = new IdleHeartbeat(millis, handler, opts); + } + _checkHbOrderConsumer(msg: Msg): boolean { const rm = msg.headers!.get(JsHeaders.ConsumerStalledHdr); if (rm !== "") { @@ -743,11 +836,37 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl args.max_bytes = opts.max_bytes!; } + let expires = 0; if (opts.expires && opts.expires > 0) { - args.expires = nanos(opts.expires); + expires = opts.expires; + args.expires = nanos(expires); + } + + let hb = 0; + if (opts.idle_heartbeat && opts.idle_heartbeat > 0) { + hb = opts.idle_heartbeat; + args.idle_heartbeat = nanos(hb); + } + + if (hb && expires === 0) { + throw new Error("idle_heartbeat requires expires"); + } + if (hb > expires) { + throw new Error("expires must be greater than idle_heartbeat"); } if (this.info) { + if (this.monitor) { + this.monitor.cancel(); + } + if (expires && hb) { + if (!this.monitor) { + this._setupHbMonitoring(hb, expires); + } else { + this.monitor._change(hb, expires); + } + } + const api = (this.info.api as BaseApiClient); const subj = `${api.prefix}.CONSUMER.MSG.NEXT.${stream}.${consumer}`; const reply = this.sub.subject; diff --git a/nats-base-client/jsutil.ts b/nats-base-client/jsutil.ts index 57f14fa9..fec884bc 100644 --- a/nats-base-client/jsutil.ts +++ b/nats-base-client/jsutil.ts @@ -16,11 +16,16 @@ import { AckPolicy, ConsumerConfig, DeliverPolicy, + Empty, Msg, Nanos, ReplayPolicy, } from "./types.ts"; import { ErrorCode, NatsError } from "./error.ts"; +import { MsgArg } from "./parser.ts"; +import { headers, MsgHdrsImpl } from "./headers.ts"; +import { MsgImpl } from "./msg.ts"; +import { Publisher } from "./protocol.ts"; export function validateDurableName(name?: string) { return validateName("durable", name); @@ -96,6 +101,23 @@ export function isHeartbeatMsg(msg: Msg): boolean { return isFlowControlMsg(msg) && msg.headers?.description === "Idle Heartbeat"; } +export function newJsErrorMsg( + code: number, + description: string, + subject: string, +): Msg { + const h = headers() as MsgHdrsImpl; + h.code = code; + h.description = description; + + const arg = { hdr: 1, sid: 0, size: 0 } as MsgArg; + const msg = new MsgImpl(arg, Empty, {} as Publisher); + msg._headers = h; + msg._subject = subject; + + return msg; +} + export function checkJsError(msg: Msg): NatsError | null { // JS error only if no payload - otherwise assume it is application data if (msg.data.length !== 0) { @@ -115,6 +137,7 @@ export enum Js409Errors { MaxMessageSizeExceeded = "message size exceeds maxbytes", PushConsumer = "consumer is push based", MaxWaitingExceeded = "exceeded maxwaiting", // not terminal + IdleHeartbeatMissed = "`idle heartbeats missed`", } let MAX_WAITING_FAIL = false; @@ -132,6 +155,7 @@ export function isTerminal409(err: NatsError): boolean { Js409Errors.MaxBytesExceeded, Js409Errors.MaxMessageSizeExceeded, Js409Errors.PushConsumer, + Js409Errors.IdleHeartbeatMissed, ]; if (MAX_WAITING_FAIL) { fatal.push(Js409Errors.MaxWaitingExceeded); @@ -157,14 +181,18 @@ export function checkJsErrorCode( return new NatsError(description, ErrorCode.JetStream404NoMessages); case 408: return new NatsError(description, ErrorCode.JetStream408RequestTimeout); - case 409: + case 409: { // the description can be exceeded max waiting or max ack pending, which are // recoverable, but can also be terminal errors where the request exceeds // some value in the consumer configuration + const ec = description.startsWith(Js409Errors.IdleHeartbeatMissed) + ? ErrorCode.JetStreamIdleHeartBeat + : ErrorCode.JetStream409; return new NatsError( description, - ErrorCode.JetStream409, + ec, ); + } case 503: return NatsError.errorForCode( ErrorCode.JetStreamNotEnabled, diff --git a/nats-base-client/queued_iterator.ts b/nats-base-client/queued_iterator.ts index ff18e293..736bedcc 100644 --- a/nats-base-client/queued_iterator.ts +++ b/nats-base-client/queued_iterator.ts @@ -65,7 +65,7 @@ export class QueuedIteratorImpl implements QueuedIterator { processed: number; // FIXME: this is updated by the protocol received: number; - protected noIterator: boolean; + noIterator: boolean; iterClosed: Deferred; protected done: boolean; private signal: Deferred; @@ -77,7 +77,7 @@ export class QueuedIteratorImpl implements QueuedIterator { dispatchedFn?: DispatchedFn; ctx?: unknown; _data?: unknown; //data is for use by extenders in any way they like - private err?: Error; + err?: Error; constructor() { this.inflight = 0; diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 85ea9b58..b3c9b043 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -774,6 +774,7 @@ export interface PullOptions { * number of messages in the batch to fit within this setting. */ "max_bytes": number; + "idle_heartbeat": number; } /** diff --git a/tests/idleheartbeats_test.ts b/tests/idleheartbeats_test.ts new file mode 100644 index 00000000..5dbdd795 --- /dev/null +++ b/tests/idleheartbeats_test.ts @@ -0,0 +1,109 @@ +/* + * Copyright 2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { IdleHeartbeat } from "../nats-base-client/idleheartbeat.ts"; +import { + assert, + assertEquals, +} from "https://deno.land/std@0.136.0/testing/asserts.ts"; +import { deferred } from "../nats-base-client/util.ts"; + +Deno.test("idleheartbeat - basic", async () => { + const d = deferred(); + const h = new IdleHeartbeat(250, () => { + d.reject(new Error("didn't expect to notify")); + return true; + }); + let count = 0; + const timer = setInterval(() => { + count++; + h.work(); + if (count === 8) { + clearInterval(timer); + h.cancel(); + d.resolve(); + } + }, 100); + + await d; +}); + +Deno.test("idleheartbeat - timeout", async () => { + const d = deferred(); + new IdleHeartbeat(250, (v: number): boolean => { + d.resolve(v); + return true; + }, { maxOut: 1 }); + assertEquals(await d, 1); +}); + +Deno.test("idleheartbeat - timeout maxOut", async () => { + const d = deferred(); + new IdleHeartbeat(250, (v: number): boolean => { + d.resolve(v); + return true; + }, { maxOut: 5 }); + assertEquals(await d, 5); +}); + +Deno.test("idleheartbeat - timeout recover", async () => { + const d = deferred(); + const h = new IdleHeartbeat(250, (_v: number): boolean => { + d.reject(new Error("didn't expect to fail")); + return true; + }, { maxOut: 5 }); + + const interval = setInterval(() => { + h.work(); + }, 1000); + + setTimeout(() => { + h.cancel(); + d.resolve(); + clearInterval(interval); + }, 1650); + + await d; + assertEquals(h.missed, 2); +}); + +Deno.test("idleheartbeat - timeout autocancel", async () => { + const d = deferred(); + const h = new IdleHeartbeat(250, (_v: number): boolean => { + d.reject(new Error("didn't expect to fail")); + return true; + }, { maxOut: 4, cancelAfter: 2000 }); + + assert(h.autoCancelTimer); + + let t = 0; + const timer = setInterval(() => { + h.work(); + t++; + if (t === 20) { + clearInterval(timer); + d.resolve(); + } + }, 100); + + // we are not canceling the monitor, as the test will catch + // and resource leaks for a timer if not cleared. + + await d; + assert(h.count >= 7); + assertEquals(h.cancelAfter, 2000); + assertEquals(h.timer, 0); + assertEquals(h.autoCancelTimer, 0); +}); diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 33ccc378..224f548c 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -60,6 +60,7 @@ import { assert } from "../nats-base-client/denobuffer.ts"; import { PubAck, RepublishHeaders } from "../nats-base-client/types.ts"; import { JetStreamClientImpl, + JetStreamSubscriptionImpl, JetStreamSubscriptionInfoable, } from "../nats-base-client/jsclient.ts"; import { defaultJsOptions } from "../nats-base-client/jsbaseclient_api.ts"; @@ -69,8 +70,10 @@ import { assertBetween, disabled, Lock, notCompatible } from "./helpers/mod.ts"; import { isFlowControlMsg, isHeartbeatMsg, + Js409Errors, } from "../nats-base-client/jsutil.ts"; import { Features } from "../nats-base-client/semver.ts"; +import { assertIsError } from "https://deno.land/std@0.138.0/testing/asserts.ts"; function callbackConsume(debug = false): JsMsgCallback { return (err: NatsError | null, jm: JsMsg | null) => { @@ -3346,6 +3349,137 @@ Deno.test("jetstream - pull consumer max_bytes rejected on old servers", async ( await cleanup(ns, nc); }); +Deno.test("jetstream - idleheartbeat missed on fetch", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + const { stream } = await initStream(nc); + const jsm = await nc.jetstreamManager(); + + await jsm.consumers.add(stream, { + durable_name: "me", + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + const iter = js.fetch(stream, "me", { + expires: 2000, + idle_heartbeat: 250, + //@ts-ignore: testing + delay_heartbeat: true, + }); + + await assertRejects( + async () => { + for await (const _m of iter) { + // no message expected + } + }, + NatsError, + Js409Errors.IdleHeartbeatMissed, + ); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - idleheartbeat on fetch", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + const { stream } = await initStream(nc); + const jsm = await nc.jetstreamManager(); + + await jsm.consumers.add(stream, { + durable_name: "me", + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + const iter = js.fetch(stream, "me", { + expires: 2000, + idle_heartbeat: 250, + }); + + // we don't expect this to throw + await (async () => { + for await (const _m of iter) { + // no message expected + } + })(); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - idleheartbeats errors repeat in callback push sub", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + const { subj } = await initStream(nc); + + const js = nc.jetstream(); + await js.publish(subj, Empty); + + const buf: NatsError[] = []; + + const d = deferred(); + const fn = (err: NatsError | null, _msg: JsMsg | null): void => { + if (err) { + buf.push(err); + if (buf.length === 3) { + d.resolve(); + } + } + }; + + const opts = consumerOpts(); + opts.durable("me"); + opts.manualAck(); + opts.ackExplicit(); + opts.idleHeartbeat(800); + opts.deliverTo(createInbox()); + opts.callback(fn); + + const sub = await js.subscribe(subj, opts) as JetStreamSubscriptionImpl; + assert(sub.monitor); + await delay(3000); + sub.monitor._change(100, 0, 3); + + buf.forEach((err) => { + assertIsError(err, NatsError, Js409Errors.IdleHeartbeatMissed); + }); + + assertEquals(sub.sub.isClosed(), false); + + await cleanup(ns, nc); +}); + +Deno.test("jetstream - idleheartbeats errors in iterator push sub", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + const { subj } = await initStream(nc); + + const opts = consumerOpts(); + opts.durable("me"); + opts.manualAck(); + opts.ackExplicit(); + opts.idleHeartbeat(800); + opts.deliverTo(createInbox()); + + const js = nc.jetstream(); + const sub = await js.subscribe(subj, opts) as JetStreamSubscriptionImpl; + + const d = deferred(); + (async () => { + for await (const _m of sub) { + // not going to get anything + } + })().catch((err) => { + d.resolve(err); + }); + assert(sub.monitor); + await delay(1700); + sub.monitor._change(100, 0, 1); + const err = await d; + assertIsError(err, NatsError, Js409Errors.IdleHeartbeatMissed); + assertEquals(err.code, ErrorCode.JetStreamIdleHeartBeat); + assertEquals(sub.sub.isClosed(), true); + + await cleanup(ns, nc); +}); + Deno.test("jetstream - bind ephemeral can get consumer info", async () => { const { ns, nc } = await setup(jetstreamServerConf({}, true)); const { stream, subj } = await initStream(nc);