diff --git a/.changeset/cool-planes-protect.md b/.changeset/cool-planes-protect.md new file mode 100644 index 000000000000..f5b5a7fa665c --- /dev/null +++ b/.changeset/cool-planes-protect.md @@ -0,0 +1,22 @@ +--- +'@rocket.chat/omnichannel-transcript': patch +'@rocket.chat/authorization-service': patch +'@rocket.chat/stream-hub-service': patch +'@rocket.chat/presence-service': patch +'@rocket.chat/fuselage-ui-kit': patch +'@rocket.chat/account-service': patch +'@rocket.chat/mock-providers': patch +'@rocket.chat/ui-theming': patch +'@rocket.chat/uikit-playground': patch +'@rocket.chat/ddp-streamer': patch +'@rocket.chat/queue-worker': patch +'@rocket.chat/apps-engine': patch +'@rocket.chat/ui-composer': patch +'@rocket.chat/ui-contexts': patch +'@rocket.chat/ui-client': patch +'@rocket.chat/models': patch +'@rocket.chat/sha256': patch +'@rocket.chat/meteor': patch +--- + +Fixes an issue that prevented the apps-engine from reestablishing communications with subprocesses in some cases diff --git a/packages/apps-engine/src/server/runtime/deno/AppsEngineDenoRuntime.ts b/packages/apps-engine/src/server/runtime/deno/AppsEngineDenoRuntime.ts index daa4b442b481..277ca15373c0 100644 --- a/packages/apps-engine/src/server/runtime/deno/AppsEngineDenoRuntime.ts +++ b/packages/apps-engine/src/server/runtime/deno/AppsEngineDenoRuntime.ts @@ -16,7 +16,7 @@ import type { AppLogStorage, IAppStorageItem } from '../../storage'; import { LivenessManager } from './LivenessManager'; import { ProcessMessenger } from './ProcessMessenger'; import { bundleLegacyApp } from './bundler'; -import { decoder } from './codec'; +import { newDecoder } from './codec'; const baseDebug = debugFactory('appsEngine:runtime:deno'); @@ -112,7 +112,11 @@ export class DenoRuntimeSubprocessController extends EventEmitter { private readonly livenessManager: LivenessManager; // We need to keep the appSource around in case the Deno process needs to be restarted - constructor(manager: AppManager, private readonly appPackage: IParseAppPackageResult, private readonly storageItem: IAppStorageItem) { + constructor( + manager: AppManager, + private readonly appPackage: IParseAppPackageResult, + private readonly storageItem: IAppStorageItem, + ) { super(); this.debug = baseDebug.extend(appPackage.info.id); @@ -388,6 +392,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { console.error(`Failed to startup Deno subprocess for app ${this.getAppId()}`, err); }); this.once('ready', this.onReady.bind(this)); + this.parseStdout(this.deno.stdout); } @@ -609,43 +614,50 @@ export class DenoRuntimeSubprocessController extends EventEmitter { } private async parseStdout(stream: Readable): Promise { - for await (const message of decoder.decodeStream(stream)) { - this.debug('Received message from subprocess %o', message); - try { - // Process PONG resonse first as it is not JSON RPC - if (message === COMMAND_PONG) { - this.emit('pong'); - continue; - } - - const JSONRPCMessage = jsonrpc.parseObject(message); - - if (Array.isArray(JSONRPCMessage)) { - throw new Error('Invalid message format'); - } - - if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') { - this.handleIncomingMessage(JSONRPCMessage).catch((reason) => - console.error(`[${this.getAppId()}] Error executing handler`, reason, message), - ); - continue; - } - - if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') { - this.handleResultMessage(JSONRPCMessage).catch((reason) => console.error(`[${this.getAppId()}] Error executing handler`, reason, message)); - continue; - } - - console.error('Unrecognized message type', JSONRPCMessage); - } catch (e) { - // SyntaxError is thrown when the message is not a valid JSON - if (e instanceof SyntaxError) { - console.error(`[${this.getAppId()}] Failed to parse message`); - continue; + try { + for await (const message of newDecoder().decodeStream(stream)) { + this.debug('Received message from subprocess %o', message); + try { + // Process PONG resonse first as it is not JSON RPC + if (message === COMMAND_PONG) { + this.emit('pong'); + continue; + } + + const JSONRPCMessage = jsonrpc.parseObject(message); + + if (Array.isArray(JSONRPCMessage)) { + throw new Error('Invalid message format'); + } + + if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') { + this.handleIncomingMessage(JSONRPCMessage).catch((reason) => + console.error(`[${this.getAppId()}] Error executing handler`, reason, message), + ); + continue; + } + + if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') { + this.handleResultMessage(JSONRPCMessage).catch((reason) => + console.error(`[${this.getAppId()}] Error executing handler`, reason, message), + ); + continue; + } + + console.error('Unrecognized message type', JSONRPCMessage); + } catch (e) { + // SyntaxError is thrown when the message is not a valid JSON + if (e instanceof SyntaxError) { + console.error(`[${this.getAppId()}] Failed to parse message`); + continue; + } + + console.error(`[${this.getAppId()}] Error executing handler`, e, message); } - - console.error(`[${this.getAppId()}] Error executing handler`, e, message); } + } catch (e) { + console.error(`[${this.getAppId()}]`, e); + this.emit('error', new Error('DECODE_ERROR')); } } @@ -653,7 +665,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { try { const data = JSON.parse(chunk.toString()); - this.debug('Metrics received from subprocess: %o', data); + this.debug('Metrics received from subprocess (via stderr): %o', data); } catch (e) { console.error('Subprocess stderr', chunk.toString()); } diff --git a/packages/apps-engine/src/server/runtime/deno/LivenessManager.ts b/packages/apps-engine/src/server/runtime/deno/LivenessManager.ts index 3f363c5402f1..61d208786f9d 100644 --- a/packages/apps-engine/src/server/runtime/deno/LivenessManager.ts +++ b/packages/apps-engine/src/server/runtime/deno/LivenessManager.ts @@ -67,6 +67,13 @@ export class LivenessManager { this.pingAbortController = new EventEmitter(); this.options = Object.assign({}, defaultOptions, options); + + this.controller.on('ready', () => this.ping()); + this.controller.on('error', async (reason) => { + if (reason instanceof Error && reason.message.startsWith('DECODE_ERROR')) { + await this.restartProcess('Decode error', 'controller'); + } + }); } public getRuntimeData() { @@ -84,7 +91,6 @@ export class LivenessManager { this.pingTimeoutConsecutiveCount = 0; - this.controller.once('ready', () => this.ping()); this.subprocess.once('exit', this.handleExit.bind(this)); this.subprocess.once('error', this.handleError.bind(this)); } @@ -188,7 +194,7 @@ export class LivenessManager { this.restartProcess(reason); } - private async restartProcess(reason: string) { + private async restartProcess(reason: string, source = 'liveness-manager') { if (this.restartCount >= this.options.maxRestarts) { this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts); this.controller.stopApp(); @@ -197,8 +203,8 @@ export class LivenessManager { this.restartLog.push({ reason, + source, restartedAt: new Date(), - source: 'liveness-manager', pid: this.subprocess.pid, }); diff --git a/packages/apps-engine/src/server/runtime/deno/ProcessMessenger.ts b/packages/apps-engine/src/server/runtime/deno/ProcessMessenger.ts index c919adb5f0bb..4d76f4c4c10d 100644 --- a/packages/apps-engine/src/server/runtime/deno/ProcessMessenger.ts +++ b/packages/apps-engine/src/server/runtime/deno/ProcessMessenger.ts @@ -2,11 +2,13 @@ import type { ChildProcess } from 'child_process'; import type { JsonRpc } from 'jsonrpc-lite'; -import { encoder } from './codec'; +import { type Encoder, newEncoder } from './codec'; export class ProcessMessenger { private deno: ChildProcess | undefined; + private encoder: Encoder | undefined; + private _sendStrategy: (message: JsonRpc) => void; constructor(private readonly debug: debug.Debugger) { @@ -25,6 +27,7 @@ export class ProcessMessenger { public clearReceiver() { delete this.deno; + delete this.encoder; this.switchStrategy(); } @@ -32,6 +35,9 @@ export class ProcessMessenger { private switchStrategy() { if (this.deno?.stdin?.writable) { this._sendStrategy = this.strategySend.bind(this); + + // Get a clean encoder + this.encoder = newEncoder(); } else { this._sendStrategy = this.strategyError.bind(this); } @@ -43,6 +49,6 @@ export class ProcessMessenger { private strategySend(message: JsonRpc) { this.debug('Sending message to subprocess %o', message); - this.deno.stdin.write(encoder.encode(message)); + this.deno.stdin.write(this.encoder.encode(message)); } } diff --git a/packages/apps-engine/src/server/runtime/deno/codec.ts b/packages/apps-engine/src/server/runtime/deno/codec.ts index 3af728fb105d..8525c65896ad 100644 --- a/packages/apps-engine/src/server/runtime/deno/codec.ts +++ b/packages/apps-engine/src/server/runtime/deno/codec.ts @@ -1,4 +1,4 @@ -import { Decoder, Encoder, ExtensionCodec } from '@msgpack/msgpack'; +import { Decoder as _Decoder, Encoder as _Encoder, ExtensionCodec } from '@msgpack/msgpack'; const extensionCodec = new ExtensionCodec(); @@ -10,6 +10,7 @@ extensionCodec.register({ return new Uint8Array([0]); } }, + decode: (_data: Uint8Array) => undefined, }); @@ -21,9 +22,24 @@ extensionCodec.register({ return new Uint8Array(object.buffer, object.byteOffset, object.byteLength); } }, + // msgpack will reuse the Uint8Array instance, so WE NEED to copy it instead of simply creating a view decode: (data: Uint8Array) => Buffer.from(data), }); -export const encoder = new Encoder({ extensionCodec }); -export const decoder = new Decoder({ extensionCodec }); +/** + * The Encoder and Decoder classes perform "stateful" operations, i.e. they read from a + * stream, store the data locally and decode it from its buffer. + * + * In practice, this affects the decoder when there is decode error. After an error, the decoder + * keeps the malformed data in its buffer, and even if we try to decode from another source (e.g. different stream) + * it will fail again as there's still data in the buffer. + * + * For that reason, we can't have a singleton instance of Encoder and Decoder, but rather one + * instance for each time we create a new subprocess + */ +export const newEncoder = () => new _Encoder({ extensionCodec }); +export const newDecoder = () => new _Decoder({ extensionCodec }); + +export type Encoder = _Encoder; +export type Decoder = _Decoder;