Skip to content

Commit

Permalink
server: possibly fix bug where rpc object may not be found
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Sep 4, 2024
1 parent 1e1755f commit 5f4e279
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 19 deletions.
4 changes: 2 additions & 2 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 16 additions & 7 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,17 @@ def computeClusterObjectHash(o: ClusterObject) -> str:
def isClusterAddress(address: str):
return not address or address == SCRYPTED_CLUSTER_ADDRESS

def onProxySerialization(value: Any, sourceKey: str = None):
def onProxySerialization(peer: rpc.RpcPeer, value: Any, sourceKey: str = None):
properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {}
clusterEntry = properties.get("__cluster", None)
proxyId: str = (
clusterEntry and clusterEntry.get("proxyId", None)
) or rpc.RpcPeer.generateId()
proxyId: str
existing = peer.localProxied.get(value, None)
if existing:
proxyId = existing["id"]
else:
proxyId = (
clusterEntry and clusterEntry.get("proxyId", None)
) or rpc.RpcPeer.generateId()

if clusterEntry:
if (
Expand All @@ -543,7 +548,9 @@ def onProxySerialization(value: Any, sourceKey: str = None):

return proxyId, properties

self.peer.onProxySerialization = onProxySerialization
self.peer.onProxySerialization = lambda value: onProxySerialization(
self.peer, value, None
)

async def resolveObject(id: str, sourceKey: str):
sourcePeer: rpc.RpcPeer = (
Expand Down Expand Up @@ -571,7 +578,7 @@ async def handleClusterClient(
self.loop, rpcTransport
)
peer.onProxySerialization = lambda value: onProxySerialization(
value, clusterPeerPort
peer, value, clusterPeerKey
)
future: asyncio.Future[rpc.RpcPeer] = asyncio.Future()
future.set_result(peer)
Expand Down Expand Up @@ -621,7 +628,9 @@ async def connectClusterPeer():
self.loop, rpcTransport
)
clusterPeer.onProxySerialization = (
lambda value: onProxySerialization(value, clusterPeerKey)
lambda value: onProxySerialization(
clusterPeer, value, clusterPeerKey
)
)
except:
clusterPeers.pop(clusterPeerKey)
Expand Down
13 changes: 11 additions & 2 deletions server/python/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,21 @@ def serialize(self, value, serializationContext: Dict):

proxiedEntry = self.localProxied.get(value, None)
if proxiedEntry:
if self.onProxySerialization:
proxyId, __remote_proxy_props = self.onProxySerialization(value)
else:
__remote_proxy_props = RpcPeer.prepareProxyProperties(value)
proxyId = proxiedEntry['id']

if proxyId != proxiedEntry['id']:
raise Exception('onProxySerialization proxy id mismatch')

proxiedEntry['finalizerId'] = RpcPeer.generateId()
ret = {
'__remote_proxy_id': proxiedEntry['id'],
'__remote_proxy_id': proxyId,
'__remote_proxy_finalizer_id': proxiedEntry['finalizerId'],
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': RpcPeer.prepareProxyProperties(value),
'__remote_proxy_props': __remote_proxy_props,
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
}
return ret
Expand Down
13 changes: 7 additions & 6 deletions server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
return !address || address === SCRYPTED_CLUSTER_ADDRESS;
}

const onProxySerialization = (value: any, sourceKey?: string) => {
const onProxySerialization = (peer: RpcPeer, value: any, sourceKey: string) => {
const properties = RpcPeer.prepareProxyProperties(value) || {};
let clusterEntry: ClusterObject = properties.__cluster;

// ensure globally stable proxyIds.
// worker threads will embed their pid and tid in the proxy id for cross worker fast path.
const proxyId = clusterEntry?.proxyId || `n-${process.pid}-${worker_threads.threadId}-${RpcPeer.generateId()}`;
const proxyId = peer.localProxied.get(value)?.id || clusterEntry?.proxyId || `n-${process.pid}-${worker_threads.threadId}-${RpcPeer.generateId()}`;

// if the cluster entry already exists, check if it belongs to this node.
// if it belongs to this node, the entry must also be for this peer.
Expand Down Expand Up @@ -148,7 +148,8 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
properties,
};
}
peer.onProxySerialization = onProxySerialization;

peer.onProxySerialization = value => onProxySerialization(peer, value, undefined);

const resolveObject = async (id: string, sourceKey: string) => {
const sourcePeer = sourceKey
Expand Down Expand Up @@ -180,7 +181,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
const clusterPeer = createDuplexRpcPeer(peer.selfName, clusterPeerKey, client, client);
// the listening peer sourceKey (client address/port) is used by the OTHER peer (the client)
// to determine if it is already connected to THIS peer (the server).
clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey);
clusterPeer.onProxySerialization = (value) => onProxySerialization(clusterPeer, value, clusterPeerKey);
clusterPeers.set(clusterPeerKey, Promise.resolve(clusterPeer));
startPluginRemoteOptions?.onClusterPeer?.(clusterPeer);
const connectRPCObject: ConnectRPCObject = async (o) => {
Expand Down Expand Up @@ -221,7 +222,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
console.warn("source address mismatch", sourceAddress);

const clusterPeer = createDuplexRpcPeer(peer.selfName, clusterPeerKey, socket, socket);
clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey);
clusterPeer.onProxySerialization = (value) => onProxySerialization(clusterPeer, value, clusterPeerKey);
return clusterPeer;
}
catch (e) {
Expand Down Expand Up @@ -272,7 +273,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
}
peerPromise = tidDeferred.promise.then(port => {
const threadPeer = NodeThreadWorker.createRpcPeer(peer.selfName, threadPeerKey, port);
threadPeer.onProxySerialization = value => onProxySerialization(value, threadPeerKey);
threadPeer.onProxySerialization = value => onProxySerialization(threadPeer, value, threadPeerKey);

const connectRPCObject: ConnectRPCObject = async (o) => {
const sha256 = computeClusterObjectHash(o, clusterSecret);
Expand Down
16 changes: 14 additions & 2 deletions server/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,13 +594,25 @@ export class RpcPeer {

let proxiedEntry = this.localProxied.get(value);
if (proxiedEntry) {
const {
proxyId: __remote_proxy_id,
properties: __remote_proxy_props,
} = this.onProxySerialization?.(value)
|| {
proxyId: proxiedEntry.id,
properties: RpcPeer.prepareProxyProperties(value),
};

if (__remote_proxy_id !== proxiedEntry.id)
throw new Error('onProxySerialization proxy id mismatch');

const __remote_proxy_finalizer_id = RpcPeer.generateId();
proxiedEntry.finalizerId = __remote_proxy_finalizer_id;
const ret: RpcRemoteProxyValue = {
__remote_proxy_id: proxiedEntry.id,
__remote_proxy_id,
__remote_proxy_finalizer_id,
__remote_constructor_name,
__remote_proxy_props: RpcPeer.prepareProxyProperties(value),
__remote_proxy_props,
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
}
return ret;
Expand Down

0 comments on commit 5f4e279

Please sign in to comment.