Skip to content

Commit

Permalink
feat: add async extension in Python binding in order to use asyncio m…
Browse files Browse the repository at this point in the history
…ore conveniently

docs: fix local registry and names (#89)

feat: enhance default_extension_go for template mode (#88)

Co-authored-by: Hu Yueh-Wei <[email protected]>

docs: add a tutorial for running model locally in a Python extension (#90)

Co-authored-by: Hu Yueh-Wei <[email protected]>

fix: rename test cases

feat: upload runtime only, ignore others (#93)

fix: no need to package src (#92)

feat: add a python script for auto update package versions according … (#85)

Co-authored-by: Hu Yueh-Wei <[email protected]>

docs: updating ten_agent_server from astra_agents_dev (#95)

* docs: updating ten_agent_server from astra_agents_dev

Updating ten_agent_server from astra_agents_dev

* docs: fixing typo

Fixing typo

fix: fix some integration test cases (#94)

feat: add cargo config auto-gen (#51)

Co-authored-by: Hu Yueh-Wei <[email protected]>

chore: update version and publish to cloud store automatically (#97)

Co-authored-by: Hu Yueh-Wei <[email protected]>

fix: refine codes

fix: update versions of dependencies

fix: refine codes

fix: add TenEnvAsync class for AsyncExtension

fix: add more comments

chore: improve and supplement Python test cases

fix: refine naming

fix: refine naming

fix: refine codes

fix: join thread in destructor of AsyncExtension

fix: forked repo can not get version since tag is not synced (#99)

doc: update summary part

feat: add linux/arm64 (#98)

Co-authored-by: Hu Yueh-Wei <[email protected]>

chore: update to latest ten_gn

chore: add more CI test cases (#136)

fix: join thread before on_deinit_done to prevent from memory leak in Python binding

fix: create a new Python thread to avoid blocking extension thread
  • Loading branch information
sunxilin authored and halajohn committed Oct 14, 2024
1 parent 96f853e commit fffc802
Show file tree
Hide file tree
Showing 62 changed files with 2,327 additions and 79 deletions.
5 changes: 3 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,10 @@
"program": "${workspaceFolder}/out/linux/x64/ten_manager/bin/tman",
"cwd": "${workspaceFolder}/out/linux/x64/",
"args": [
"--verbose",
"dev-server",
"--base-dir=/home/wei/MyData/Temp/ASTRA.ai/agents",
"--port=49484"
"--base-dir=/home/sunxilin/ten_framework_internal_base/ten_framework/out/linux/x64/tests/ten_runtime/integration/python/two_async_extensions_in_different_groups_python/two_async_extensions_in_different_groups_python_app",
"--port=49483"
],
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct ten_py_ten_env_t {

// Mark whether the gil state need to be released after 'on_deinit_done'.
bool need_to_release_gil_state;
PyThreadState* py_thread_state;
} ten_py_ten_env_t;

TEN_RUNTIME_PRIVATE_API bool ten_py_ten_env_check_integrity(
Expand Down
4 changes: 4 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#
from .app import App
from .extension import Extension
from .async_extension import AsyncExtension
from .async_ten_env import AsyncTenEnv
from .addon import Addon
from .decorator import (
register_addon_as_extension,
Expand All @@ -28,7 +30,9 @@
"register_addon_as_extension_group",
"App",
"Extension",
"AsyncExtension",
"TenEnv",
"AsyncTenEnv",
"Cmd",
"StatusCode",
"VideoFrame",
Expand Down
3 changes: 3 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ def on_configure(self, ten_env: TenEnv) -> None:

def on_init(self, ten_env: TenEnv) -> None:
ten_env.on_init_done()

def on_deinit(self, ten_env: TenEnv) -> None:
ten_env.on_deinit_done()
140 changes: 140 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/async_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#
# Copyright © 2024 Agora
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0, with certain conditions.
# Refer to the "LICENSE" file in the root directory for more information.
#
import asyncio
import threading
from typing import final
from libten_runtime_python import _Extension
from .video_frame import VideoFrame
from .audio_frame import AudioFrame
from .ten_env import TenEnv
from .cmd import Cmd
from .data import Data
from .async_ten_env import AsyncTenEnv


class AsyncExtension(_Extension):
def __init__(self, name: str) -> None:
self._ten_stop_event = asyncio.Event()

def __del__(self) -> None:
self._ten_stop_event.set()
if hasattr(self, "_ten_thread"):
self._ten_thread.join()

async def _thread_routine(self, ten_env: TenEnv):
self._ten_loop = asyncio.get_running_loop()
self._async_ten_env = AsyncTenEnv(
ten_env, self._ten_loop, self._ten_thread
)

await self.on_configure(self._async_ten_env)

# Suspend the thread until stopEvent is set.
await self._ten_stop_event.wait()

await self.on_deinit(self._async_ten_env)

async def _stop_thread(self):
self._ten_stop_event.set()

@final
def _proxy_on_configure(self, ten_env: TenEnv) -> None:
# We pass the TenEnv object to another Python thread without worrying
# about the thread safety issue of the TenEnv API, because the actual
# execution logic of all TenEnv APIs occurs in the extension thread.
# We only need to ensure that the TenEnv object should remain valid
# while it is being used. The way to achieve this is to ensure that the
# Python thread remains alive until TenEnv.on_deinit_done is called.
self._ten_thread = threading.Thread(
target=asyncio.run, args=(self._thread_routine(ten_env),)
)
self._ten_thread.start()

@final
def _proxy_on_init(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(
self.on_init(self._async_ten_env), self._ten_loop
)

@final
def _proxy_on_start(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(
self.on_start(self._async_ten_env), self._ten_loop
)

@final
def _proxy_on_stop(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(
self.on_stop(self._async_ten_env), self._ten_loop
)

@final
def _proxy_on_deinit(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(self._stop_thread(), self._ten_loop)

@final
def _proxy_on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None:
asyncio.run_coroutine_threadsafe(
self.on_cmd(self._async_ten_env, cmd), self._ten_loop
)

@final
def _proxy_on_data(self, ten_env: TenEnv, data: Data) -> None:
asyncio.run_coroutine_threadsafe(
self.on_data(self._async_ten_env, data), self._ten_loop
)

@final
def _proxy_on_video_frame(
self, ten_env: TenEnv, video_frame: VideoFrame
) -> None:
asyncio.run_coroutine_threadsafe(
self.on_video_frame(self._async_ten_env, video_frame),
self._ten_loop,
)

@final
def _proxy_on_audio_frame(
self, ten_env: TenEnv, audio_frame: AudioFrame
) -> None:
asyncio.run_coroutine_threadsafe(
self.on_audio_frame(self._async_ten_env, audio_frame),
self._ten_loop,
)

# Override these methods in your extension

async def on_configure(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_configure_done()

async def on_init(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_init_done()

async def on_start(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_start_done()

async def on_stop(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_stop_done()

async def on_deinit(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_deinit_done()

async def on_cmd(self, async_ten_env: AsyncTenEnv, cmd: Cmd) -> None:
pass

async def on_data(self, async_ten_env: AsyncTenEnv, data: Data) -> None:
pass

async def on_video_frame(
self, async_ten_env: AsyncTenEnv, video_frame: VideoFrame
) -> None:
pass

async def on_audio_frame(
self, async_ten_env: AsyncTenEnv, audio_frame: AudioFrame
) -> None:
pass
58 changes: 58 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright © 2024 Agora
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0, with certain conditions.
# Refer to the "LICENSE" file in the root directory for more information.
#
from asyncio import AbstractEventLoop
import asyncio
import threading
from .cmd import Cmd
from .cmd_result import CmdResult
from .ten_env import TenEnv


class AsyncTenEnv(TenEnv):

def __init__(
self, ten_env: TenEnv, loop: AbstractEventLoop, thread: threading.Thread
) -> None:
self._internal = ten_env._internal
self._ten_loop = loop
self._ten_thread = thread
ten_env._set_release_handler(lambda: self._on_release())

def __del__(self) -> None:
pass

async def send_cmd(self, cmd: Cmd) -> CmdResult:
q = asyncio.Queue(1)
self._internal.send_cmd(
cmd,
lambda ten_env, result: asyncio.run_coroutine_threadsafe(
q.put(result), self._ten_loop
), # type: ignore
)
return await q.get()

async def send_json(self, json_str: str) -> CmdResult:
q = asyncio.Queue(1)
self._internal.send_json(
json_str,
lambda ten_env, result: asyncio.run_coroutine_threadsafe(
q.put(result), self._ten_loop
), # type: ignore
)
return await q.get()

def _deinit_routine(self) -> None:
self._ten_thread.join()
self._internal.on_deinit_done()

def _on_release(self) -> None:
if hasattr(self, "_deinit_thread"):
self._deinit_thread.join()

def on_deinit_done(self) -> None:
self._deinit_thread = threading.Thread(target=self._deinit_routine)
self._deinit_thread.start()
32 changes: 32 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,58 @@ def _proxy_on_configure(self, ten_env: TenEnv) -> None:
def on_configure(self, ten_env: TenEnv) -> None:
ten_env.on_configure_done()

@final
def _proxy_on_init(self, ten_env: TenEnv) -> None:
self.on_init(ten_env)

def on_init(self, ten_env: TenEnv) -> None:
ten_env.on_init_done()

@final
def _proxy_on_start(self, ten_env: TenEnv) -> None:
self.on_start(ten_env)

def on_start(self, ten_env: TenEnv) -> None:
ten_env.on_start_done()

@final
def _proxy_on_stop(self, ten_env: TenEnv) -> None:
self.on_stop(ten_env)

def on_stop(self, ten_env: TenEnv) -> None:
ten_env.on_stop_done()

@final
def _proxy_on_deinit(self, ten_env: TenEnv) -> None:
self.on_deinit(ten_env)

def on_deinit(self, ten_env: TenEnv) -> None:
ten_env.on_deinit_done()

@final
def _proxy_on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None:
self.on_cmd(ten_env, cmd)

def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None:
pass

@final
def _proxy_on_data(self, ten_env: TenEnv, data: Data) -> None:
self.on_data(ten_env, data)

def on_data(self, ten_env: TenEnv, data: Data) -> None:
pass

@final
def _proxy_on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None:
self.on_video_frame(ten_env, video_frame)

def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None:
pass

@final
def _proxy_on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None:
self.on_audio_frame(ten_env, audio_frame)

def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class _App:
self,
ten_env: _TenEnv,
) -> None: ...
def on_deinit(
self,
ten_env: _TenEnv,
) -> None: ...

class _Extension:
def __init__(self, name: str): ...
Expand Down
7 changes: 7 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/ten_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ def __init__(self, internal_obj: _TenEnv) -> None:
def __del__(self) -> None:
pass

def _set_release_handler(self, handler: Callable[[], None]) -> None:
self._release_handler = handler

def _on_release(self) -> None:
if hasattr(self, "_release_handler"):
self._release_handler()

def on_configure_done(self) -> None:
from .addon import Addon

Expand Down
2 changes: 1 addition & 1 deletion core/src/ten_runtime/binding/python/native/app/app.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ static void proxy_on_configure(ten_app_t *app, ten_env_t *ten_env) {
// achieving numerical consistency between PyGILState_Ensure and
// PyGILState_Release, and only then will the Python thread state be
// released.
ten_py_eval_save_thread();
py_ten_env->py_thread_state = ten_py_eval_save_thread();
} else {
// No need to release the GIL.
}
Expand Down
Loading

0 comments on commit fffc802

Please sign in to comment.