Skip to content
Merged
6 changes: 6 additions & 0 deletions .changeset/kind-mangos-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@rocket.chat/apps-engine': patch
'@rocket.chat/meteor': patch
---

Fixes the ping behavior so it only triggers when the app becomes idle, preventing unnecessary restarts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ export class DenoRuntimeSubprocessController extends EventEmitter implements IRu
await this.waitUntilReady();

await this.sendRequest({ method: 'app:construct', params: [this.appPackage] });

this.emit('constructed');
}

public async stopApp() {
Expand Down Expand Up @@ -660,6 +662,8 @@ export class DenoRuntimeSubprocessController extends EventEmitter implements IRu
throw new Error('Invalid message format');
}

this.emit('heartbeat');

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
Expand Down
110 changes: 70 additions & 40 deletions packages/apps-engine/src/server/runtime/deno/LivenessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import { EventEmitter } from 'stream';
import type { DenoRuntimeSubprocessController } from './AppsEngineDenoRuntime';
import type { ProcessMessenger } from './ProcessMessenger';

const COMMAND_PING = '_zPING';
export const COMMAND_PING = '_zPING';

const defaultOptions: LivenessManager['options'] = {
pingRequestTimeout: 1000,
pingFrequencyInMS: 10000,
pingTimeoutInMS: 1000,
pingIntervalInMS: 10000,
consecutiveTimeoutLimit: 4,
maxRestarts: Infinity,
restartAttemptDelayInMS: 1000,
Expand All @@ -27,10 +27,10 @@ export class LivenessManager {

private readonly options: {
// How long should we wait for a response to the ping request
pingRequestTimeout: number;
pingTimeoutInMS: number;

// How long is the delay between ping messages
pingFrequencyInMS: number;
pingIntervalInMS: number;

// Limit of times the process can timeout the ping response before we consider it as unresponsive
consecutiveTimeoutLimit: number;
Expand All @@ -44,6 +44,13 @@ export class LivenessManager {

private subprocess: ChildProcess;

private watchdogTimeout: NodeJS.Timeout | null = null;

private lastHeartbeatTimestamp = NaN;

// A promise tracking the current ping process - used mostly for testing
private pendingPing: Promise<boolean> | null;

// This is the perfect use-case for an AbortController, but it's experimental in Node 14.x
private pingAbortController: EventEmitter;

Expand All @@ -68,7 +75,11 @@ export class LivenessManager {

this.options = Object.assign({}, defaultOptions, options);

this.controller.on('ready', () => this.ping());
this.controller.on('heartbeat', () => {
this.lastHeartbeatTimestamp = Date.now();
this.pingTimeoutConsecutiveCount = 0;
});

this.controller.on('error', async (reason) => {
if (reason instanceof Error && reason.message.startsWith('DECODE_ERROR')) {
await this.restartProcess('Decode error', 'controller');
Expand All @@ -77,9 +88,10 @@ export class LivenessManager {
}

public getRuntimeData() {
const { restartCount, pingTimeoutConsecutiveCount, restartLog } = this;
const { lastHeartbeatTimestamp, restartCount, pingTimeoutConsecutiveCount, restartLog } = this;

return {
lastHeartbeatTimestamp,
restartCount,
pingTimeoutConsecutiveCount,
restartLog,
Expand All @@ -93,6 +105,40 @@ export class LivenessManager {

this.subprocess.once('exit', this.handleExit.bind(this));
this.subprocess.once('error', this.handleError.bind(this));

this.controller.once('constructed', this.start.bind(this));
}

public start() {
this.lastHeartbeatTimestamp = Date.now();

this.watchdogTimeout = setInterval(() => {
if (Date.now() - this.lastHeartbeatTimestamp < this.options.pingIntervalInMS) {
return;
}

try {
this.ping();
} catch {
// If the ping call fails synchronously, it's because we couldn't send the ping message
// then likely the process isn't running, so we stop everything
this.debug('[LivenessManager] Failed to send ping to subprocess, stopping watchdog...');
this.stop();
}
}, this.options.pingIntervalInMS);

this.watchdogTimeout.unref();
}

public stop() {
this.pingAbortController.emit('abort');
clearInterval(this.watchdogTimeout);
this.watchdogTimeout = null;
this.pendingPing = null;
}

public getPendingPing() {
return this.pendingPing;
}

/**
Expand All @@ -105,23 +151,14 @@ export class LivenessManager {
private ping() {
const start = Date.now();

let aborted = false;

const setAborted = () => {
this.debug('Ping aborted');

aborted = true;
};

// If we get an abort, ping should not continue
this.pingAbortController.once('abort', setAborted);

new Promise<void>((resolve, reject) => {
this.pendingPing = new Promise<boolean>((resolve, reject) => {
const onceCallback = () => {
this.debug('Ping successful in %d ms', Date.now() - start);
const now = Date.now();
this.debug('Ping successful in %d ms', now - start);
clearTimeout(timeoutId);
this.pingTimeoutConsecutiveCount = 0;
resolve();
this.lastHeartbeatTimestamp = now;
resolve(true);
};

const timeoutCallback = () => {
Expand All @@ -131,13 +168,17 @@ export class LivenessManager {
reject('timeout');
};

const timeoutId = setTimeout(timeoutCallback, this.options.pingRequestTimeout);
this.pingAbortController.once('abort', () => {
this.debug('Ping aborted');
reject('abort');
});

const timeoutId = setTimeout(timeoutCallback, this.options.pingTimeoutInMS);

this.controller.once('pong', onceCallback);
})
.then(() => !aborted)
.catch((reason) => {
if (aborted) {
if (reason === 'abort') {
return false;
}

Expand All @@ -152,18 +193,8 @@ export class LivenessManager {

return true;
})
.then((shouldContinue) => {
if (!shouldContinue) {
this.pingAbortController.off('abort', setAborted);
return;
}

setTimeout(() => {
if (aborted) return;

this.pingAbortController.off('abort', setAborted);
this.ping();
}, this.options.pingFrequencyInMS);
.finally(() => {
this.pingAbortController.removeAllListeners('abort');
});

this.messenger.send(COMMAND_PING);
Expand All @@ -175,8 +206,6 @@ export class LivenessManager {
}

private handleExit(exitCode: number, signal: string) {
this.pingAbortController.emit('abort');

const processState = this.controller.getProcessState();
// If the we're restarting the process, or want to stop the process, or it exited cleanly, nothing else for us to do
if (processState === 'restarting' || processState === 'stopped' || (exitCode === 0 && !signal)) {
Expand All @@ -185,7 +214,7 @@ export class LivenessManager {

let reason: string;

// Otherwise we try to restart the subprocess, if possible
// Otherwise we attempt to restart the process
if (signal) {
this.debug('App has been killed (%s). Attempting restart #%d...', signal, this.restartCount + 1);
reason = `App has been killed with signal ${signal}`;
Expand All @@ -198,6 +227,8 @@ export class LivenessManager {
}

private async restartProcess(reason: string, source = 'liveness-manager') {
this.stop();

if (this.restartCount >= this.options.maxRestarts) {
this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts);
this.controller.stopApp();
Expand All @@ -218,7 +249,6 @@ export class LivenessManager {
setTimeout(() => this.restartProcess('Failed restart attempt'), this.options.restartAttemptDelayInMS);
}

this.pingTimeoutConsecutiveCount = 0;
this.restartCount++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ import type { ChildProcess } from 'child_process';

import type { JsonRpc } from 'jsonrpc-lite';

import type { COMMAND_PING } from './LivenessManager';
import type { Encoder } from './codec';
import { newEncoder } from './codec';

type Message = JsonRpc | typeof COMMAND_PING;

export class ProcessMessenger {
private deno: ChildProcess | undefined;

private encoder: Encoder | undefined;

private _sendStrategy: (message: JsonRpc) => void;
private _sendStrategy: (message: Message) => void;

constructor() {
this._sendStrategy = this.strategyError;
}

public get send() {
return this._sendStrategy.bind(this);
public send(message: Message) {
this._sendStrategy(message);
}

public setReceiver(deno: ChildProcess) {
Expand All @@ -44,11 +47,11 @@ export class ProcessMessenger {
}
}

private strategyError(_message: JsonRpc) {
private strategyError(_message: Message) {
throw new Error('No process configured to receive a message');
}

private strategySend(message: JsonRpc) {
private strategySend(message: Message) {
this.deno.stdin.write(this.encoder.encode(message));
}
}
Loading
Loading