Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decoder error preventing succesfull app subprocess restart #34880

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .changeset/cool-planes-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
'@rocket.chat/omnichannel-transcript': patch
'@rocket.chat/authorization-service': patch
'@rocket.chat/stream-hub-service': patch
'@rocket.chat/presence-service': patch
'@rocket.chat/fuselage-ui-kit': patch
'@rocket.chat/account-service': patch
'@rocket.chat/mock-providers': patch
'@rocket.chat/ui-theming': patch
'@rocket.chat/uikit-playground': patch
'@rocket.chat/ddp-streamer': patch
'@rocket.chat/queue-worker': patch
'@rocket.chat/apps-engine': patch
'@rocket.chat/ui-composer': patch
'@rocket.chat/ui-contexts': patch
'@rocket.chat/ui-client': patch
'@rocket.chat/models': patch
'@rocket.chat/sha256': patch
'@rocket.chat/meteor': patch
---

Fixes an issue that prevented the apps-engine from reestablishing communications with subprocesses in some cases
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import { LivenessManager } from './LivenessManager';
import { ProcessMessenger } from './ProcessMessenger';
import { bundleLegacyApp } from './bundler';
import { decoder } from './codec';
import { newDecoder } from './codec';

const baseDebug = debugFactory('appsEngine:runtime:deno');

Expand Down Expand Up @@ -112,7 +112,11 @@
private readonly livenessManager: LivenessManager;

// We need to keep the appSource around in case the Deno process needs to be restarted
constructor(manager: AppManager, private readonly appPackage: IParseAppPackageResult, private readonly storageItem: IAppStorageItem) {
constructor(

Check failure on line 115 in packages/apps-engine/src/server/runtime/deno/AppsEngineDenoRuntime.ts

View workflow job for this annotation

GitHub Actions / 🔎 Code Check / Code Lint

Replace `⏎········manager:·AppManager,⏎········private·readonly·appPackage:·IParseAppPackageResult,⏎········private·readonly·storageItem:·IAppStorageItem,⏎····` with `manager:·AppManager,·private·readonly·appPackage:·IParseAppPackageResult,·private·readonly·storageItem:·IAppStorageItem`
manager: AppManager,
private readonly appPackage: IParseAppPackageResult,
private readonly storageItem: IAppStorageItem,
) {
super();

this.debug = baseDebug.extend(appPackage.info.id);
Expand Down Expand Up @@ -388,6 +392,7 @@
console.error(`Failed to startup Deno subprocess for app ${this.getAppId()}`, err);
});
this.once('ready', this.onReady.bind(this));

this.parseStdout(this.deno.stdout);
}

Expand Down Expand Up @@ -609,51 +614,58 @@
}

private async parseStdout(stream: Readable): Promise<void> {
for await (const message of decoder.decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}

const JSONRPCMessage = jsonrpc.parseObject(message);

if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) => console.error(`[${this.getAppId()}] Error executing handler`, reason, message));
continue;
}

console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
try {
for await (const message of newDecoder().decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}

const JSONRPCMessage = jsonrpc.parseObject(message);

if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}

console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
}

console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}

console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}
} catch (e) {
console.error(`[${this.getAppId()}]`, e);
this.emit('error', new Error('DECODE_ERROR'));
}
}

private async parseError(chunk: Buffer): Promise<void> {
try {
const data = JSON.parse(chunk.toString());

this.debug('Metrics received from subprocess: %o', data);
this.debug('Metrics received from subprocess (via stderr): %o', data);
} catch (e) {
console.error('Subprocess stderr', chunk.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ export class LivenessManager {
this.pingAbortController = new EventEmitter();

this.options = Object.assign({}, defaultOptions, options);

this.controller.on('ready', () => this.ping());
this.controller.on('error', async (reason) => {
if (reason instanceof Error && reason.message.startsWith('DECODE_ERROR')) {
await this.restartProcess('Decode error', 'controller');
}
});
}

public getRuntimeData() {
Expand All @@ -84,7 +91,6 @@ export class LivenessManager {

this.pingTimeoutConsecutiveCount = 0;

this.controller.once('ready', () => this.ping());
this.subprocess.once('exit', this.handleExit.bind(this));
this.subprocess.once('error', this.handleError.bind(this));
}
Expand Down Expand Up @@ -188,7 +194,7 @@ export class LivenessManager {
this.restartProcess(reason);
}

private async restartProcess(reason: string) {
private async restartProcess(reason: string, source = 'liveness-manager') {
if (this.restartCount >= this.options.maxRestarts) {
this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts);
this.controller.stopApp();
Expand All @@ -197,8 +203,8 @@ export class LivenessManager {

this.restartLog.push({
reason,
source,
restartedAt: new Date(),
source: 'liveness-manager',
pid: this.subprocess.pid,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import type { ChildProcess } from 'child_process';

import type { JsonRpc } from 'jsonrpc-lite';

import { encoder } from './codec';
import { type Encoder, newEncoder } from './codec';

export class ProcessMessenger {
private deno: ChildProcess | undefined;

private encoder: Encoder | undefined;

private _sendStrategy: (message: JsonRpc) => void;

constructor(private readonly debug: debug.Debugger) {
Expand All @@ -25,13 +27,17 @@ export class ProcessMessenger {

public clearReceiver() {
delete this.deno;
delete this.encoder;

this.switchStrategy();
}

private switchStrategy() {
if (this.deno?.stdin?.writable) {
this._sendStrategy = this.strategySend.bind(this);

// Get a clean encoder
this.encoder = newEncoder();
} else {
this._sendStrategy = this.strategyError.bind(this);
}
Expand All @@ -43,6 +49,6 @@ export class ProcessMessenger {

private strategySend(message: JsonRpc) {
this.debug('Sending message to subprocess %o', message);
this.deno.stdin.write(encoder.encode(message));
this.deno.stdin.write(this.encoder.encode(message));
}
}
22 changes: 19 additions & 3 deletions packages/apps-engine/src/server/runtime/deno/codec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Decoder, Encoder, ExtensionCodec } from '@msgpack/msgpack';
import { Decoder as _Decoder, Encoder as _Encoder, ExtensionCodec } from '@msgpack/msgpack';

const extensionCodec = new ExtensionCodec();

Expand All @@ -10,6 +10,7 @@ extensionCodec.register({
return new Uint8Array([0]);
}
},

decode: (_data: Uint8Array) => undefined,
});

Expand All @@ -21,9 +22,24 @@ extensionCodec.register({
return new Uint8Array(object.buffer, object.byteOffset, object.byteLength);
}
},

// msgpack will reuse the Uint8Array instance, so WE NEED to copy it instead of simply creating a view
decode: (data: Uint8Array) => Buffer.from(data),
});

export const encoder = new Encoder({ extensionCodec });
export const decoder = new Decoder({ extensionCodec });
/**
* The Encoder and Decoder classes perform "stateful" operations, i.e. they read from a
* stream, store the data locally and decode it from its buffer.
*
* In practice, this affects the decoder when there is decode error. After an error, the decoder
* keeps the malformed data in its buffer, and even if we try to decode from another source (e.g. different stream)
* it will fail again as there's still data in the buffer.
*
* For that reason, we can't have a singleton instance of Encoder and Decoder, but rather one
* instance for each time we create a new subprocess
*/
export const newEncoder = () => new _Encoder({ extensionCodec });
export const newDecoder = () => new _Decoder({ extensionCodec });

export type Encoder = _Encoder;
export type Decoder = _Decoder;
Loading