Skip to content

Commit

Permalink
server: exit hooks for python fork
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Nov 20, 2024
1 parent aed6e0c commit 69f4de6
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 18 deletions.
4 changes: 2 additions & 2 deletions sdk/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/sdk",
"version": "0.3.84",
"version": "0.3.86",
"description": "",
"main": "dist/src/index.js",
"exports": {
Expand Down
4 changes: 2 additions & 2 deletions sdk/types/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/types/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/types",
"version": "0.3.77",
"version": "0.3.79",
"description": "",
"main": "dist/index.js",
"author": "",
Expand Down
1 change: 1 addition & 0 deletions sdk/types/scrypted_python/scrypted_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class PluginFork:
result: asyncio.Task
worker: Process
exit: asyncio.Task
def terminate(self):
pass

Expand Down
2 changes: 1 addition & 1 deletion sdk/types/scrypted_python/scrypted_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ class TamperState(TypedDict):
pass


TYPES_VERSION = "0.3.77"
TYPES_VERSION = "0.3.79"


class AirPurifier:
Expand Down
8 changes: 4 additions & 4 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
"@scrypted/types": "^0.3.77",
"@scrypted/types": "^0.3.79",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
Expand Down
38 changes: 32 additions & 6 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,12 @@ def __init__(self, loop: AbstractEventLoop):
async def waitKilled(self):
await self.killed

def safe_set_result(fut: Future, result: Any):
try:
fut.set_result(result)
except:
pass

class PluginRemote:
def __init__(
self, clusterSetup: ClusterSetup, api, pluginId: str, hostInfo, loop: AbstractEventLoop
Expand Down Expand Up @@ -834,13 +840,13 @@ async def waitPeerLiveness():
pass
asyncio.ensure_future(waitPeerLiveness(), loop=self.loop)

async def waitClusterForkResult():
async def waitClusterForkKilled():
try:
await clusterForkResult.waitKilled()
except:
pass
peerLiveness.killed.set_result(None)
asyncio.ensure_future(waitClusterForkResult(), loop=self.loop)
safe_set_result(peerLiveness.killed, None)
asyncio.ensure_future(waitClusterForkKilled(), loop=self.loop)

clusterGetRemote = await self.clusterSetup.connectRPCObject(await clusterForkResult.getResult())
remoteDict = await clusterGetRemote()
Expand All @@ -858,9 +864,20 @@ async def waitClusterForkResult():

pluginFork = PluginFork()
pluginFork.result = asyncio.create_task(getClusterFork())
pluginFork.terminate = lambda: peerLiveness.killed.set_result(None)
async def waitKilled():
await peerLiveness.killed
pluginFork.exit = asyncio.create_task(waitKilled())
def terminate():
safe_set_result(peerLiveness.killed, None)
pluginFork.worker.terminate()
pluginFork.terminate = terminate

pluginFork.worker = None

return pluginFork

t: asyncio.Task
t.cancel()
if options:
runtime = options.get("runtime", None)
if runtime and runtime != "python":
Expand All @@ -869,17 +886,26 @@ async def waitClusterForkResult():
raise Exception("python fork to filename not supported")

parent_conn, child_conn = multiprocessing.Pipe()

pluginFork = PluginFork()
print("new fork")
killed = Future(loop=self.loop)
async def waitKilled():
await killed
pluginFork.exit = asyncio.create_task(waitKilled())
def terminate():
safe_set_result(killed, None)
pluginFork.worker.kill()
pluginFork.terminate = terminate

pluginFork.worker = multiprocessing.Process(
target=plugin_fork, args=(child_conn,), daemon=True
)
pluginFork.worker.start()
pluginFork.terminate = lambda: pluginFork.worker.kill()

def schedule_exit_check():
def exit_check():
if pluginFork.worker.exitcode != None:
safe_set_result(killed, None)
pluginFork.worker.join()
else:
schedule_exit_check()
Expand Down

0 comments on commit 69f4de6

Please sign in to comment.