Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer heartbeats #327

Merged
merged 1 commit into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nats-base-client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export enum ErrorCode {
JetStream409MaxAckPendingExceeded = "409",
JetStream409 = "409",
JetStreamNotEnabled = "503",
JetStreamIdleHeartBeat = "IDLE_HEARTBEAT",

// emitted by the server
AuthorizationViolation = "AUTHORIZATION_VIOLATION",
Expand Down
139 changes: 139 additions & 0 deletions nats-base-client/idleheartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2022 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Called with the number of missed heartbeats.
* If the function returns true, the monitor will cancel monitoring.
*/
export type IdleHeartbeatFn = (n: number) => boolean;

/**
* IdleHeartbeatOptions
*/
export type IdleHeartbeatOptions = {
/**
* @field maxOut - optional maximum number of missed heartbeats before notifying (default is 2)
*/
maxOut: number;
/**
* @field cancelAfter - optional timer to auto cancel monitoring in millis
*/
cancelAfter: number;
};

export class IdleHeartbeat {
interval: number;
maxOut: number;
cancelAfter: number;
timer?: number;
autoCancelTimer?: number;
last!: number;
missed: number;
count: number;
callback: IdleHeartbeatFn;

/**
* Constructor
* @param interval in millis to check
* @param cb a callback to report when heartbeats are missed
* @param opts monitor options @see IdleHeartbeatOptions
*/
constructor(
interval: number,
cb: IdleHeartbeatFn,
opts: Partial<IdleHeartbeatOptions> = { maxOut: 2 },
) {
this.interval = interval;
this.maxOut = opts?.maxOut || 2;
this.cancelAfter = opts?.cancelAfter || 0;
this.last = Date.now();
this.missed = 0;
this.count = 0;
this.callback = cb;

this._schedule();
}

/**
* cancel monitoring
*/
cancel() {
if (this.autoCancelTimer) {
clearTimeout(this.autoCancelTimer);
}
if (this.timer) {
clearInterval(this.timer);
}
this.timer = 0;
this.autoCancelTimer = 0;
}

/**
* work signals that there was work performed
*/
work() {
this.last = Date.now();
this.missed = 0;
}

/**
* internal api to change the interval, cancelAfter and maxOut
* @param interval
* @param cancelAfter
* @param maxOut
*/
_change(interval: number, cancelAfter = 0, maxOut = 2) {
this.interval = interval;
this.maxOut = maxOut;
this.cancelAfter = cancelAfter;
this.restart();
}

/**
* cancels and restarts the monitoring
*/
restart() {
this.cancel();
this._schedule();
}

/**
* internal api called to start monitoring
*/
_schedule() {
if (this.cancelAfter > 0) {
// @ts-ignore: in node is not a number - we treat this opaquely
this.autoCancelTimer = setTimeout(() => {
this.cancel();
}, this.cancelAfter);
}
// @ts-ignore: in node is not a number - we treat this opaquely
this.timer = setInterval(() => {
this.count++;
if (Date.now() - this.interval > this.last) {
this.missed++;
}
if (this.missed >= this.maxOut) {
try {
if (this.callback(this.missed) === true) {
this.cancel();
}
} catch (err) {
console.log(err);
}
}
}, this.interval);
}
}
123 changes: 121 additions & 2 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ import {
isFlowControlMsg,
isHeartbeatMsg,
isTerminal409,
Js409Errors,
millis,
nanos,
newJsErrorMsg,
validateDurableName,
validateStreamName,
} from "./jsutil.ts";
Expand All @@ -80,6 +83,7 @@ import { Bucket } from "./kv.ts";
import { NatsConnectionImpl } from "./nats.ts";
import { Feature } from "./semver.ts";
import { ObjectStoreImpl } from "./objectstore.ts";
import { IdleHeartbeat } from "./idleheartbeat.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand Down Expand Up @@ -243,6 +247,7 @@ export class JetStreamClientImpl extends BaseApiClient
const trackBytes = (opts.max_bytes ?? 0) > 0;
let receivedBytes = 0;
const max_bytes = trackBytes ? opts.max_bytes! : 0;
let monitor: IdleHeartbeat | null = null;

const args: Partial<PullOptions> = {};
args.batch = opts.batch || 1;
Expand All @@ -266,10 +271,27 @@ export class JetStreamClientImpl extends BaseApiClient
if (expires === 0 && args.no_wait === false) {
throw new Error("expires or no_wait is required");
}
const hb = opts.idle_heartbeat || 0;
if (hb) {
args.idle_heartbeat = nanos(hb);
//@ts-ignore: for testing
if (opts.delay_heartbeat === true) {
//@ts-ignore: test option
args.idle_heartbeat = nanos(hb * 4);
}
}

const qi = new QueuedIteratorImpl<JsMsg>();
const wants = args.batch;
let received = 0;
qi.protocolFilterFn = (jm, _ingest = false): boolean => {
const jsmi = jm as JsMsgImpl;
if (isHeartbeatMsg(jsmi.msg)) {
monitor?.work();
return false;
}
return true;
};
// FIXME: this looks weird, we want to stop the iterator
// but doing it from a dispatchedFn...
qi.dispatchedFn = (m: JsMsg | null) => {
Expand Down Expand Up @@ -311,6 +333,8 @@ export class JetStreamClientImpl extends BaseApiClient
qi.stop(err);
}
} else {
// if we are doing heartbeats, message resets
monitor?.work();
qi.received++;
qi.push(toJsMsg(msg));
}
Expand All @@ -327,16 +351,40 @@ export class JetStreamClientImpl extends BaseApiClient
sub.drain();
timer = null;
}
if (monitor) {
monitor.cancel();
}
});
}

(async () => {
try {
if (hb) {
monitor = new IdleHeartbeat(hb, (v: number): boolean => {
//@ts-ignore: pushing a fn
qi.push(() => {
// this will terminate the iterator
qi.err = new NatsError(
`${Js409Errors.IdleHeartbeatMissed}: ${v}`,
ErrorCode.JetStreamIdleHeartBeat,
);
});
return true;
});
}
} catch (_err) {
// ignore it
}

// close the iterator if the connection or subscription closes unexpectedly
await (sub as SubscriptionImpl).closed;
if (timer !== null) {
timer.cancel();
timer = null;
}
if (monitor) {
monitor.cancel();
}
qi.stop();
})().catch();

Expand Down Expand Up @@ -413,6 +461,9 @@ export class JetStreamClientImpl extends BaseApiClient
sub.unsubscribe();
throw err;
}

sub._maybeSetupHbMonitoring();

return sub;
}

Expand Down Expand Up @@ -594,10 +645,14 @@ export class JetStreamClientImpl extends BaseApiClient
return (jm: JsMsg | null, ctx?: unknown): IngestionFilterFnResult => {
// ctx is expected to be the iterator (the JetstreamSubscriptionImpl)
const jsub = ctx as JetStreamSubscriptionImpl;

// this shouldn't happen
if (!jm) return { ingest: false, protocol: false };

const jmi = jm as JsMsgImpl;
if (!checkJsError(jmi.msg)) {
jsub.monitor?.work();
}
if (isHeartbeatMsg(jmi.msg)) {
const ingest = ordered ? jsub._checkHbOrderConsumer(jmi.msg) : true;
if (!ordered) {
Expand All @@ -614,9 +669,10 @@ export class JetStreamClientImpl extends BaseApiClient
}
}

class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
implements JetStreamSubscriptionInfoable, Destroyable, ConsumerInfoable {
js: BaseApiClient;
monitor: IdleHeartbeat | null;

constructor(
js: BaseApiClient,
Expand All @@ -625,6 +681,13 @@ class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
) {
super(js.nc, subject, opts);
this.js = js;
this.monitor = null;

this.sub.closed.then(() => {
if (this.monitor) {
this.monitor.cancel();
}
});
}

set info(info: JetStreamSubscriptionInfo | null) {
Expand Down Expand Up @@ -667,6 +730,36 @@ class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
});
}

// this is called by push subscriptions, to initialize the monitoring
// if configured on the consumer
_maybeSetupHbMonitoring() {
const ns = this.info?.config?.idle_heartbeat || 0;
if (ns) {
this._setupHbMonitoring(millis(ns));
}
}

_setupHbMonitoring(millis: number, cancelAfter = 0) {
const opts = { cancelAfter: 0, maxOut: 2 };
if (cancelAfter) {
opts.cancelAfter = cancelAfter;
}
const sub = this.sub as SubscriptionImpl;
const handler = (v: number): boolean => {
const msg = newJsErrorMsg(
409,
`${Js409Errors.IdleHeartbeatMissed}: ${v}`,
this.sub.subject,
);
this.sub.callback(null, msg);
// if we are a handler, we'll continue reporting
// iterators will stop
return !sub.noIterator;
};
// this only applies for push subscriptions
this.monitor = new IdleHeartbeat(millis, handler, opts);
}

_checkHbOrderConsumer(msg: Msg): boolean {
const rm = msg.headers!.get(JsHeaders.ConsumerStalledHdr);
if (rm !== "") {
Expand Down Expand Up @@ -743,11 +836,37 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl
args.max_bytes = opts.max_bytes!;
}

let expires = 0;
if (opts.expires && opts.expires > 0) {
args.expires = nanos(opts.expires);
expires = opts.expires;
args.expires = nanos(expires);
}

let hb = 0;
if (opts.idle_heartbeat && opts.idle_heartbeat > 0) {
hb = opts.idle_heartbeat;
args.idle_heartbeat = nanos(hb);
}

if (hb && expires === 0) {
throw new Error("idle_heartbeat requires expires");
}
if (hb > expires) {
throw new Error("expires must be greater than idle_heartbeat");
}

if (this.info) {
if (this.monitor) {
this.monitor.cancel();
}
if (expires && hb) {
if (!this.monitor) {
this._setupHbMonitoring(hb, expires);
} else {
this.monitor._change(hb, expires);
}
}

const api = (this.info.api as BaseApiClient);
const subj = `${api.prefix}.CONSUMER.MSG.NEXT.${stream}.${consumer}`;
const reply = this.sub.subject;
Expand Down
Loading