diff --git a/agents-api/agents_api/activities/execute_system.py b/agents-api/agents_api/activities/execute_system.py index a8e13f72d..42b996573 100644 --- a/agents-api/agents_api/activities/execute_system.py +++ b/agents-api/agents_api/activities/execute_system.py @@ -1,6 +1,3 @@ -import asyncio -from concurrent.futures import ProcessPoolExecutor -from functools import partial from typing import Any from uuid import UUID @@ -27,9 +24,6 @@ from .container import container from .utils import get_handler -# For running synchronous code in the background -process_pool_executor = ProcessPoolExecutor() - @lifespan(app, container) # Both are needed because we are using the routes @beartype @@ -115,20 +109,10 @@ async def execute_system( session_id = arguments.pop("session_id", None) create_session_request = CreateSessionRequest(**arguments) - # In case sessions.create becomes asynchronous in the future - if asyncio.iscoroutinefunction(handler): - return await handler() - - # Run the synchronous function in another process - loop = asyncio.get_running_loop() - return await loop.run_in_executor( - process_pool_executor, - partial( - handler, - developer_id=developer_id, - session_id=session_id, - data=create_session_request, - ), + return await handler( + developer_id=developer_id, + session_id=session_id, + data=create_session_request, ) # Handle update session @@ -137,20 +121,10 @@ async def execute_system( session_id = arguments.pop("session_id") update_session_request = UpdateSessionRequest(**arguments) - # In case sessions.update becomes asynchronous in the future - if asyncio.iscoroutinefunction(handler): - return await handler() - - # Run the synchronous function in another process - loop = asyncio.get_running_loop() - return await loop.run_in_executor( - process_pool_executor, - partial( - handler, - developer_id=developer_id, - session_id=session_id, - data=update_session_request, - ), + return await handler( + developer_id=developer_id, + session_id=session_id, + data=update_session_request, ) # Handle update user @@ -159,31 +133,13 @@ async def execute_system( user_id = arguments.pop("user_id") update_user_request = UpdateUserRequest(**arguments) - # In case users.update becomes asynchronous in the future - if asyncio.iscoroutinefunction(handler): - return await handler() - - # Run the synchronous function in another process - loop = asyncio.get_running_loop() - return await loop.run_in_executor( - process_pool_executor, - partial( - handler, - developer_id=developer_id, - user_id=user_id, - data=update_user_request, - ), + return await handler( + developer_id=developer_id, + user_id=user_id, + data=update_user_request, ) - # Handle regular operations - if asyncio.iscoroutinefunction(handler): - return await handler(**arguments) - - # Run the synchronous function in another process - # FIXME: When the handler throws an exception, the process dies and the error is not captured. Instead it throws: - # "concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending." - loop = asyncio.get_running_loop() - return await loop.run_in_executor(process_pool_executor, partial(handler, **arguments)) + return await handler(**arguments) except BaseException as e: if activity.in_activity(): activity.logger.error(f"Error in execute_system_call: {e}")