Skip to content

Commit

Permalink
server: refactor runtime worker creation
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Nov 20, 2024
1 parent 02a46a9 commit 53cab91
Show file tree
Hide file tree
Showing 19 changed files with 126 additions and 102 deletions.
4 changes: 2 additions & 2 deletions sdk/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/sdk",
"version": "0.3.86",
"version": "0.3.87",
"description": "",
"main": "dist/src/index.js",
"exports": {
Expand Down
4 changes: 2 additions & 2 deletions sdk/types/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/types/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/types",
"version": "0.3.79",
"version": "0.3.80",
"description": "",
"main": "dist/index.js",
"author": "",
Expand Down
13 changes: 5 additions & 8 deletions sdk/types/scrypted_python/scrypted_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,10 @@ class AudioStreamOptions(TypedDict):

class ClusterFork(TypedDict):

clusterWorkerId: str # The id of the cluster worker id that will execute this fork.
filename: str # The filename to execute in the fork. Not supported in all runtimes.
id: str # The id of the device that is associated with this fork.
labels: Any # The labels used to select the cluster worker that will execute this fork.
name: str # The name of this fork. This will be used to set the thread name
nativeId: str # The native id of the mixin that is associated with this fork.
runtime: str # The runtime to use for this fork. If not specified, the current runtime will be used.
clusterWorkerId: str
id: str
labels: Any
runtime: str

class HttpResponseOptions(TypedDict):

Expand Down Expand Up @@ -950,7 +947,7 @@ class TamperState(TypedDict):
pass


TYPES_VERSION = "0.3.79"
TYPES_VERSION = "0.3.80"


class AirPurifier:
Expand Down
2 changes: 1 addition & 1 deletion sdk/types/src/types.input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2670,7 +2670,7 @@ export interface ForkOptions {
};
}

export interface ClusterFork extends ForkOptions {
export interface ClusterFork {
runtime?: ForkOptions['runtime'];
labels?: ForkOptions['labels'];
id?: ForkOptions['id'];
Expand Down
12 changes: 6 additions & 6 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
"@scrypted/types": "^0.3.79",
"@scrypted/types": "^0.3.80",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
Expand Down
17 changes: 15 additions & 2 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ async def loadZipWrapped(self, packageJson, zipAPI: Any, zipOptions: dict):
forkMain = zipOptions and zipOptions.get("fork")
debug = zipOptions.get("debug", None)
plugin_volume = pv.ensure_plugin_volume(self.pluginId)
zipHash = zipOptions.get("zipHash")
zipHash: str = zipOptions.get("zipHash")
plugin_zip_paths = pv.prep(plugin_volume, zipHash)

if debug:
Expand Down Expand Up @@ -824,11 +824,24 @@ async def getZip(self):
if cluster_labels.needs_cluster_fork_worker(options):
peerLiveness = PeerLiveness(self.loop)
async def getClusterFork():
runtimeWorkerOptions = {
"packageJson": packageJson,
"env": None,
"pluginDebug": None,
"zipFile": None,
"unzippedPath": None,
"zipHash": zipHash,
}

forkComponent = await self.api.getComponent("cluster-fork")
sanitizedOptions = options.copy()
sanitizedOptions["runtime"] = sanitizedOptions.get("runtime", "python")
sanitizedOptions["zipHash"] = zipHash
clusterForkResult = await forkComponent.fork(peerLiveness, sanitizedOptions, packageJson, zipHash, lambda: zipAPI.getZip())
clusterForkResult = await forkComponent.fork(
runtimeWorkerOptions,
sanitizedOptions,
peerLiveness, lambda: zipAPI.getZip()
)

async def waitPeerLiveness():
try:
Expand Down
46 changes: 25 additions & 21 deletions server/src/plugin/plugin-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { WebSocketConnection } from './plugin-remote-websocket';
import { ensurePluginVolume, getScryptedVolume } from './plugin-volume';
import { createClusterForkWorker } from './runtime/cluster-fork-worker';
import { prepareZipSync } from './runtime/node-worker-common';
import { RuntimeWorker } from './runtime/runtime-worker';
import type { RuntimeWorker, RuntimeWorkerOptions } from './runtime/runtime-worker';

const serverVersion = require('../../package.json').version;

Expand Down Expand Up @@ -341,7 +341,15 @@ export class PluginHost {
if (!workerHost)
throw new UnsupportedRuntimeError(this.packageJson.scrypted.runtime);

let peer: Promise<RpcPeer>
let peer: Promise<RpcPeer>;
const runtimeWorkerOptions: RuntimeWorkerOptions = {
packageJson: this.packageJson,
env,
pluginDebug,
unzippedPath: this.unzippedPath,
zipFile: this.zipFile,
zipHash: this.zipHash,
};
if (!needsClusterForkWorker(this.packageJson.scrypted)) {
this.peer = new RpcPeer('host', this.pluginId, (message, reject, serializationContext) => {
if (connected) {
Expand All @@ -354,14 +362,7 @@ export class PluginHost {

peer = Promise.resolve(this.peer);

this.worker = workerHost(this.scrypted.mainFilename, this.pluginId, {
packageJson: this.packageJson,
env,
pluginDebug,
unzippedPath: this.unzippedPath,
zipFile: this.zipFile,
zipHash: this.zipHash,
}, this.scrypted);
this.worker = workerHost(this.scrypted.mainFilename, runtimeWorkerOptions, this.scrypted);

this.worker.setupRpcPeer(this.peer);

Expand All @@ -379,25 +380,28 @@ export class PluginHost {
});

const clusterSetup = setupCluster(this.peer);
const { runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker((async () => {
await clusterSetup.initializeCluster({
clusterId: this.scrypted.clusterId,
clusterSecret: this.scrypted.clusterSecret,
});
return this.scrypted.clusterFork;
})(),
this.zipHash, async () => fs.promises.readFile(this.zipFile),
this.packageJson.scrypted, this.packageJson, clusterSetup.connectRPCObject);
const { runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker(
runtimeWorkerOptions,
this.packageJson.scrypted,
(async () => {
await clusterSetup.initializeCluster({
clusterId: this.scrypted.clusterId,
clusterSecret: this.scrypted.clusterSecret,
});
return this.scrypted.clusterFork;
})(),
async () => fs.promises.readFile(this.zipFile),
clusterSetup.connectRPCObject);

forkPeer.then(peer => {
const originalPeer = this.peer;
originalPeer.killedSafe.finally(() => peer.kill());
this.peer = peer;
peer.killedSafe.finally(() => originalPeer.kill());
}).catch(() => {});
}).catch(() => { });
clusterWorkerId.then(clusterWorkerId => {
console.log('cluster worker id', clusterWorkerId);
}).catch(() => {});
}).catch(() => { });

this.worker = runtimeWorker;
peer = forkPeer;
Expand Down
26 changes: 16 additions & 10 deletions server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { createClusterForkWorker } from './runtime/cluster-fork-worker';
import { NodeThreadWorker } from './runtime/node-thread-worker';
import { prepareZip } from './runtime/node-worker-common';
import { getBuiltinRuntimeHosts } from './runtime/runtime-host';
import { RuntimeWorker } from './runtime/runtime-worker';
import { RuntimeWorker, RuntimeWorkerOptions } from './runtime/runtime-worker';
import type { ClusterForkService } from '../services/cluster-fork';
import type { PluginComponent } from '../services/plugin';
import { ClusterManagerImpl } from '../scrypted-cluster-main';
Expand Down Expand Up @@ -216,10 +216,23 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
let nativeWorker: child_process.ChildProcess | worker_threads.Worker;
let clusterWorkerId: Promise<string>;

const runtimeWorkerOptions: RuntimeWorkerOptions = {
packageJson,
env: undefined,
pluginDebug: undefined,
zipFile,
unzippedPath,
zipHash,
};

// if running in a cluster, fork to a matching cluster worker only if necessary.
if (needsClusterForkWorker(options)) {
({ runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker(
api.getComponent('cluster-fork'), zipHash, () => zipAPI.getZip(), options, packageJson, scrypted.connectRPCObject)
runtimeWorkerOptions,
options,
api.getComponent('cluster-fork'),
() => zipAPI.getZip(),
scrypted.connectRPCObject)
);
}
else {
Expand All @@ -228,14 +241,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
const runtime = builtins.get(options.runtime);
if (!runtime)
throw new Error('unknown runtime ' + options.runtime);
runtimeWorker = runtime(mainFilename, pluginId, {
packageJson,
env: undefined,
pluginDebug: undefined,
zipFile,
unzippedPath,
zipHash,
}, undefined);
runtimeWorker = runtime(mainFilename, runtimeWorkerOptions, undefined);

if (runtimeWorker instanceof ChildProcessWorker) {
nativeWorker = runtimeWorker.childProcess;
Expand Down
4 changes: 3 additions & 1 deletion server/src/plugin/runtime/child-process-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import { RpcMessage, RpcPeer } from "../../rpc";
import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker";

export abstract class ChildProcessWorker extends EventEmitter implements RuntimeWorker {
public pluginId: string;
protected worker: child_process.ChildProcess;

get childProcess() {
return this.worker;
}

constructor(public pluginId: string, options: RuntimeWorkerOptions) {
constructor(options: RuntimeWorkerOptions) {
super();
this.pluginId = options.packageJson.name;
}

setupWorker() {
Expand Down
24 changes: 15 additions & 9 deletions server/src/plugin/runtime/cluster-fork-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ import { RpcPeer } from "../../rpc";
import { PeerLiveness } from "../../scrypted-cluster-main";
import type { ClusterForkService } from "../../services/cluster-fork";
import { writeWorkerGenerator } from "../plugin-console";
import type { RuntimeWorker } from "./runtime-worker";
import type { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker";

export function createClusterForkWorker(
runtimeWorkerOptions: RuntimeWorkerOptions,
options: Partial<ClusterFork>,
forkComponentPromise: Promise<ClusterForkService>,
zipHash: string,
getZip: () => Promise<Buffer>,
options: Partial<ClusterFork>,
packageJson: any,
connectRPCObject: (o: any) => Promise<any>) {

// these are specific to the cluster worker host
// and will be set there.
delete runtimeWorkerOptions.zipFile;
delete runtimeWorkerOptions.unzippedPath;

const waitKilled = new Deferred<void>();
waitKilled.promise.finally(() => events.emit('exit'));
const events = new EventEmitter();
Expand All @@ -38,21 +43,22 @@ export function createClusterForkWorker(
});

const peerLiveness = new PeerLiveness(waitKilled.promise);
const clusterForkResultPromise = forkComponentPromise.then(forkComponent => forkComponent.fork(peerLiveness, {
const clusterForkResultPromise = forkComponentPromise.then(forkComponent => forkComponent.fork(runtimeWorkerOptions, {
runtime: options.runtime || 'node',
id: options.id,
...options,
}, packageJson, zipHash, getZip));
clusterForkResultPromise.catch(() => {});
}, peerLiveness,
getZip));
clusterForkResultPromise.catch(() => { });

const clusterWorkerId = clusterForkResultPromise.then(clusterForkResult => clusterForkResult.clusterWorkerId);
clusterWorkerId.catch(() => {});
clusterWorkerId.catch(() => { });

const forkPeer = (async () => {
const clusterForkResult = await clusterForkResultPromise;
waitKilled.promise.finally(() => {
runtimeWorker.pid = undefined;
clusterForkResult.kill().catch(() => {});
clusterForkResult.kill().catch(() => { });
});
clusterForkResult.waitKilled().catch(() => { })
.finally(() => {
Expand Down
6 changes: 3 additions & 3 deletions server/src/plugin/runtime/custom-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export class CustomRuntimeWorker extends ChildProcessWorker {
serializer: ReturnType<typeof createRpcDuplexSerializer>;
fork: boolean;

constructor(pluginId: string, options: RuntimeWorkerOptions, runtime: ScryptedRuntime) {
super(pluginId, options);
constructor(options: RuntimeWorkerOptions, runtime: ScryptedRuntime) {
super(options);

const pluginDevice = runtime.findPluginDevice(this.pluginId);
const scryptedRuntimeArguments: ScryptedRuntimeArguments = pluginDevice.state.scryptedRuntimeArguments?.value;
Expand All @@ -27,7 +27,7 @@ export class CustomRuntimeWorker extends ChildProcessWorker {
// stdin, stdout, stderr, peer in, peer out
stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
env: Object.assign({}, process.env, env, {
SCRYYPTED_PLUGIN_ID: pluginId,
SCRYYPTED_PLUGIN_ID: this.pluginId,
SCRYPTED_DEBUG_PORT: pluginDebug?.inspectPort?.toString(),
SCRYPTED_UNZIPPED_PATH: options.unzippedPath,
SCRYPTED_ZIP_FILE: options.zipFile,
Expand Down
4 changes: 2 additions & 2 deletions server/src/plugin/runtime/node-fork-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export function isNodePluginChildProcess() {

export class NodeForkWorker extends ChildProcessWorker {

constructor(mainFilename: string, pluginId: string, options: RuntimeWorkerOptions) {
super(pluginId, options);
constructor(mainFilename: string, options: RuntimeWorkerOptions) {
super(options);

const { env, pluginDebug } = options;

Expand Down
Loading

0 comments on commit 53cab91

Please sign in to comment.