From fa066bde0b13d5de79c68fe0a4376659a4143574 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Tue, 19 Nov 2024 12:05:03 +0800 Subject: [PATCH 1/2] feat: change the return value of python async send_cmd to async generator --- .../python/interface/ten/async_ten_env.py | 32 +++++++++++++------ .../aio_http_server_python/main.py | 15 +++++---- .../extension.py | 2 +- .../default_extension_python/extension.py | 2 +- .../default_extension_python/extension.py | 6 ++-- .../default_extension_python/extension.py | 2 +- .../default_extension_python/extension.py | 2 +- 7 files changed, 38 insertions(+), 23 deletions(-) diff --git a/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py b/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py index 799abea65..14e00252e 100644 --- a/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py +++ b/core/src/ten_runtime/binding/python/interface/ten/async_ten_env.py @@ -10,6 +10,7 @@ from .cmd import Cmd from .cmd_result import CmdResult from .ten_env import TenEnv +from typing import AsyncGenerator class AsyncTenEnv(TenEnv): @@ -25,25 +26,38 @@ def __init__( def __del__(self) -> None: pass - async def send_cmd(self, cmd: Cmd) -> CmdResult: - q = asyncio.Queue(1) + async def send_cmd(self, cmd: Cmd) -> AsyncGenerator[CmdResult, None]: + q = asyncio.Queue(maxsize=10) self._internal.send_cmd( cmd, - lambda ten_env, result: asyncio.run_coroutine_threadsafe( + lambda _, result: asyncio.run_coroutine_threadsafe( q.put(result), self._ten_loop - ), # type: ignore + ), ) - return await q.get() - async def send_json(self, json_str: str) -> CmdResult: - q = asyncio.Queue(1) + while True: + result: CmdResult = await q.get() + if result.is_completed(): + yield result + # This is the final result, so break the while loop. + break + yield result + + async def send_json(self, json_str: str) -> AsyncGenerator[CmdResult, None]: + q = asyncio.Queue(maxsize=10) self._internal.send_json( json_str, lambda ten_env, result: asyncio.run_coroutine_threadsafe( q.put(result), self._ten_loop - ), # type: ignore + ), ) - return await q.get() + while True: + result: CmdResult = await q.get() + if result.is_completed(): + yield result + # This is the final result, so break the while loop. + break + yield result def _deinit_routine(self) -> None: # Wait for the internal thread to finish. diff --git a/packages/example_extensions/aio_http_server_python/main.py b/packages/example_extensions/aio_http_server_python/main.py index a6e11e17b..776df1ffd 100644 --- a/packages/example_extensions/aio_http_server_python/main.py +++ b/packages/example_extensions/aio_http_server_python/main.py @@ -19,6 +19,13 @@ class HttpServerExtension(AsyncExtension): + async def _send_close_app_cmd(self): + close_app_cmd_json = ( + '{"_ten":{"type":"close_app",' '"dest":[{"app":"localhost"}]}}' + ) + async for _ in self.ten_env.send_json(close_app_cmd_json): + pass + async def default_handler(self, request: web_request.Request): # Parse the json body. try: @@ -37,11 +44,7 @@ async def default_handler(self, request: web_request.Request): else: # If the command is a 'close_app' command, send it to the app. if "type" in data["_ten"] and data["_ten"]["type"] == "close_app": - close_app_cmd_json = ( - '{"_ten":{"type":"close_app",' - '"dest":[{"app":"localhost"}]}}' - ) - asyncio.create_task(self.ten_env.send_json(close_app_cmd_json)) + asyncio.create_task(self._send_close_app_cmd()) return web.Response(status=200, text="OK") elif "name" in data["_ten"]: # Send the command to the TEN runtime. @@ -53,7 +56,7 @@ async def default_handler(self, request: web_request.Request): if cmd is None: return web.Response(status=400, text="Bad request") - cmd_result = await self.ten_env.send_cmd(cmd) + cmd_result = await anext(self.ten_env.send_cmd(cmd)) else: return web.Response(status=404, text="Not found") diff --git a/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py b/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py index d6741075d..3ee59bccb 100644 --- a/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/async_extension_basic_python/async_extension_basic_python_app/ten_packages/extension/default_async_extension_python/extension.py @@ -48,7 +48,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: # Send a new command to other extensions and wait for the result. The # result will be returned to the original sender. new_cmd = Cmd.create("hello") - cmd_result = await ten_env.send_cmd(new_cmd) + cmd_result = await anext(ten_env.send_cmd(new_cmd)) ten_env.return_result(cmd_result, cmd) async def on_stop(self, ten_env: AsyncTenEnv) -> None: diff --git a/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py index 914ba86fa..731062862 100644 --- a/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/async_io_basic_python/async_io_basic_python_app/ten_packages/extension/default_extension_python/extension.py @@ -33,7 +33,7 @@ async def stop_thread(self): async def send_cmd_async(self, ten_env: TenEnv, cmd: Cmd) -> CmdResult: print("DefaultExtension send_cmd_async") - q = asyncio.Queue(1) + q = asyncio.Queue(maxsize=10) ten_env.send_cmd( cmd, lambda ten_env, result: asyncio.run_coroutine_threadsafe( diff --git a/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py index 0508b3791..e9acdfc59 100644 --- a/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/go_app_async_extension_python/go_app_async_extension_python_app/ten_packages/extension/default_extension_python/extension.py @@ -51,9 +51,7 @@ async def greeting(self, ten_env: AsyncTenEnv) -> CmdResult: await asyncio.sleep(1) new_cmd = Cmd.create("greeting") - return await ten_env.send_cmd( - new_cmd, - ) + return await anext(ten_env.send_cmd(new_cmd)) async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: cmd_json = cmd.to_json() @@ -66,7 +64,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: await asyncio.sleep(0.5) - result = await ten_env.send_cmd(new_cmd) + result = await anext(ten_env.send_cmd(new_cmd)) statusCode = result.get_status_code() detail = result.get_property_string("detail") diff --git a/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py index b1c8e767c..fd9e5dd40 100644 --- a/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/two_async_exts_one_group_python/two_async_exts_one_group_python_app/ten_packages/extension/default_extension_python/extension.py @@ -41,7 +41,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: # Send a new command to other extensions and wait for the result. The # result will be returned to the original sender. new_cmd = Cmd.create("hello") - cmd_result = await ten_env.send_cmd(new_cmd) + cmd_result = await anext(ten_env.send_cmd(new_cmd)) ten_env.return_result(cmd_result, cmd) async def on_stop(self, ten_env: AsyncTenEnv) -> None: diff --git a/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py b/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py index 84ce80c07..637eda594 100644 --- a/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py +++ b/tests/ten_runtime/integration/python/two_async_exts_python/two_async_exts_python_app/ten_packages/extension/default_extension_python/extension.py @@ -42,7 +42,7 @@ async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd) -> None: # Send a new command to other extensions and wait for the result. The # result will be returned to the original sender. new_cmd = Cmd.create("hello") - cmd_result = await ten_env.send_cmd(new_cmd) + cmd_result = await anext(ten_env.send_cmd(new_cmd)) ten_env.return_result(cmd_result, cmd) async def on_stop(self, ten_env: AsyncTenEnv) -> None: From f6126f56dca716035f5085134f8b6e94d415e273 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Tue, 19 Nov 2024 12:10:02 +0800 Subject: [PATCH 2/2] feat: change the return value of python async send_cmd to async generator --- .../aio_http_server_python/main.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/packages/example_extensions/aio_http_server_python/main.py b/packages/example_extensions/aio_http_server_python/main.py index 776df1ffd..b3441258e 100644 --- a/packages/example_extensions/aio_http_server_python/main.py +++ b/packages/example_extensions/aio_http_server_python/main.py @@ -19,13 +19,6 @@ class HttpServerExtension(AsyncExtension): - async def _send_close_app_cmd(self): - close_app_cmd_json = ( - '{"_ten":{"type":"close_app",' '"dest":[{"app":"localhost"}]}}' - ) - async for _ in self.ten_env.send_json(close_app_cmd_json): - pass - async def default_handler(self, request: web_request.Request): # Parse the json body. try: @@ -44,7 +37,13 @@ async def default_handler(self, request: web_request.Request): else: # If the command is a 'close_app' command, send it to the app. if "type" in data["_ten"] and data["_ten"]["type"] == "close_app": - asyncio.create_task(self._send_close_app_cmd()) + close_app_cmd_json = ( + '{"_ten":{"type":"close_app",' + '"dest":[{"app":"localhost"}]}}' + ) + asyncio.create_task( + anext(self.ten_env.send_json(close_app_cmd_json)) + ) return web.Response(status=200, text="OK") elif "name" in data["_ten"]: # Send the command to the TEN runtime.