Skip to content

Commit

Permalink
Don't use ChannelMultiplexer in RPCProtocol (eclipse-theia#13980)
Browse files Browse the repository at this point in the history
Fixes eclipse-theia#13960

Contributed on behalf of STMicroelectronics

Signed-off-by: Thomas Mäder <[email protected]>
  • Loading branch information
tsmaeder authored Aug 13, 2024
1 parent 44d42ec commit 817c1a0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 209 deletions.
143 changes: 0 additions & 143 deletions packages/plugin-ext/src/common/proxy-handler.ts

This file was deleted.

128 changes: 62 additions & 66 deletions packages/plugin-ext/src/common/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@

/* eslint-disable @typescript-eslint/no-explicit-any */

import { Channel, Disposable, DisposableCollection, isObject, ReadBuffer, URI, WriteBuffer } from '@theia/core';
import { Channel, Disposable, DisposableCollection, isObject, ReadBuffer, RpcProtocol, URI, WriteBuffer } from '@theia/core';
import { Emitter, Event } from '@theia/core/lib/common/event';
import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder';
import { MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
import { ClientProxyHandler, ProxySynchronizer, RpcInvocationHandler } from './proxy-handler';
import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager';
import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri';
import { BinaryBuffer } from '@theia/core/lib/common/buffer';
import { Range, Position } from '../plugin/types-impl';
import { Deferred } from '@theia/core/lib/common/promise-util';

export interface MessageConnection {
send(msg: string): void;
Expand Down Expand Up @@ -79,21 +76,36 @@ export namespace ConnectionClosedError {
}

export class RPCProtocolImpl implements RPCProtocol {
private readonly locals = new Map<string, RpcInvocationHandler>();
private readonly locals = new Map<string, any>();
private readonly proxies = new Map<string, any>();
private readonly multiplexer: ChannelMultiplexer;
private readonly encoder = new MsgPackMessageEncoder();
private readonly decoder = new MsgPackMessageDecoder();
private readonly initCallback: ProxySynchronizer;
private readonly rpc: RpcProtocol;

private readonly toDispose = new DisposableCollection(
Disposable.create(() => { /* mark as no disposed */ })
);

constructor(channel: Channel) {
this.toDispose.push(this.multiplexer = new ChannelMultiplexer(new BatchingChannel(channel)));
this.rpc = new RpcProtocol(new BatchingChannel(channel), (method, args) => this.handleRequest(method, args));
this.rpc.onNotification((evt: { method: string; args: any[]; }) => this.handleNotification(evt.method, evt.args));
this.toDispose.push(Disposable.create(() => this.proxies.clear()));
this.initCallback = new ProxySynchronizerImpl();
}

handleNotification(method: any, args: any[]): void {
const serviceId = args[0] as string;
const handler: any = this.locals.get(serviceId);
if (!handler) {
throw new Error(`no local service handler with id ${serviceId}`);
}
handler[method](...(args.slice(1)));
}

handleRequest(method: string, args: any[]): Promise<any> {
const serviceId = args[0] as string;
const handler: any = this.locals.get(serviceId);
if (!handler) {
throw new Error(`no local service handler with id ${serviceId}`);
}
return handler[method](...(args.slice(1)));
}

dispose(): void {
Expand All @@ -117,76 +129,60 @@ export class RPCProtocolImpl implements RPCProtocol {
}

protected createProxy<T>(proxyId: string): T {
const handler = new ClientProxyHandler({
id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId), proxySynchronizer: this.initCallback
});
const handler = {
get: (target: any, name: string, receiver: any): any => {
if (target[name] || name.charCodeAt(0) !== 36 /* CharCode.DollarSign */) {
// not a remote property
return target[name];
}
const isNotify = this.isNotification(name);
return async (...args: any[]) => {
const method = name.toString();
if (isNotify) {
this.rpc.sendNotification(method, [proxyId, ...args]);
} else {
return await this.rpc.sendRequest(method, [proxyId, ...args]) as Promise<any>;
}
};
}

};
return new Proxy(Object.create(null), handler);
}

/**
* Return whether the given property represents a notification. If true,
* the promise returned from the invocation will resolve immediately to `undefined`
*
* A property leads to a notification rather than a method call if its name
* begins with `notify` or `on`.
*
* @param p - The property being called on the proxy.
* @return Whether `p` represents a notification.
*/
protected isNotification(p: PropertyKey): boolean {
let propertyString = p.toString();
if (propertyString.charCodeAt(0) === 36/* CharCode.DollarSign */) {
propertyString = propertyString.substring(1);
}
return propertyString.startsWith('notify') || propertyString.startsWith('on');
}

set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R {
if (this.isDisposed) {
throw ConnectionClosedError.create();
}
const invocationHandler = this.locals.get(identifier.id);
if (!invocationHandler) {
const handler = new RpcInvocationHandler({ id: identifier.id, target: instance, encoder: this.encoder, decoder: this.decoder });

const channel = this.multiplexer.getOpenChannel(identifier.id);
if (channel) {
handler.listen(channel);
} else {
const channelOpenListener = this.multiplexer.onDidOpenChannel(event => {
if (event.id === identifier.id) {
handler.listen(event.channel);
channelOpenListener.dispose();
}
});
}

this.locals.set(identifier.id, handler);
if (!this.locals.has(identifier.id)) {
this.locals.set(identifier.id, instance);
if (Disposable.is(instance)) {
this.toDispose.push(instance);
}
this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id)));

}
return instance;
}
}

export class ProxySynchronizerImpl implements ProxySynchronizer {

private readonly runningInitializations = new Set<string>();

private _pendingProxyInitializations: Deferred<void>;

constructor() {
this._pendingProxyInitializations = new Deferred();
/* after creation no init is active */
this._pendingProxyInitializations.resolve();
}

startProxyInitialization(id: string, init: Promise<void>): void {
if (this.runningInitializations.size === 0) {
this._pendingProxyInitializations = new Deferred();
}
init.then(() => this.finishedProxyInitialization(id));
this.runningInitializations.add(id);
}

protected finishedProxyInitialization(id: string): void {
this.runningInitializations.delete(id);
if (this.runningInitializations.size === 0) {
this._pendingProxyInitializations.resolve();
}
}

pendingProxyInitializations(): Promise<void> {
return this._pendingProxyInitializations.promise;
}

}

/**
* Wraps and underlying channel to send/receive multiple messages in one go:
* - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`.
Expand Down

0 comments on commit 817c1a0

Please sign in to comment.