Skip to content

Commit

Permalink
Merge pull request #1014 from julep-ai/x/execute-system
Browse files Browse the repository at this point in the history
fix: Fix handlers calls
  • Loading branch information
whiterabbit1983 authored Jan 4, 2025
2 parents f673b23 + 2f60f03 commit c567cdc
Showing 1 changed file with 13 additions and 57 deletions.
70 changes: 13 additions & 57 deletions agents-api/agents_api/activities/execute_system.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import Any
from uuid import UUID

Expand All @@ -27,9 +24,6 @@
from .container import container
from .utils import get_handler

# For running synchronous code in the background
process_pool_executor = ProcessPoolExecutor()


@lifespan(app, container) # Both are needed because we are using the routes
@beartype
Expand Down Expand Up @@ -115,20 +109,10 @@ async def execute_system(
session_id = arguments.pop("session_id", None)
create_session_request = CreateSessionRequest(**arguments)

# In case sessions.create becomes asynchronous in the future
if asyncio.iscoroutinefunction(handler):
return await handler()

# Run the synchronous function in another process
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
process_pool_executor,
partial(
handler,
developer_id=developer_id,
session_id=session_id,
data=create_session_request,
),
return await handler(
developer_id=developer_id,
session_id=session_id,
data=create_session_request,
)

# Handle update session
Expand All @@ -137,20 +121,10 @@ async def execute_system(
session_id = arguments.pop("session_id")
update_session_request = UpdateSessionRequest(**arguments)

# In case sessions.update becomes asynchronous in the future
if asyncio.iscoroutinefunction(handler):
return await handler()

# Run the synchronous function in another process
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
process_pool_executor,
partial(
handler,
developer_id=developer_id,
session_id=session_id,
data=update_session_request,
),
return await handler(
developer_id=developer_id,
session_id=session_id,
data=update_session_request,
)

# Handle update user
Expand All @@ -159,31 +133,13 @@ async def execute_system(
user_id = arguments.pop("user_id")
update_user_request = UpdateUserRequest(**arguments)

# In case users.update becomes asynchronous in the future
if asyncio.iscoroutinefunction(handler):
return await handler()

# Run the synchronous function in another process
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
process_pool_executor,
partial(
handler,
developer_id=developer_id,
user_id=user_id,
data=update_user_request,
),
return await handler(
developer_id=developer_id,
user_id=user_id,
data=update_user_request,
)

# Handle regular operations
if asyncio.iscoroutinefunction(handler):
return await handler(**arguments)

# Run the synchronous function in another process
# FIXME: When the handler throws an exception, the process dies and the error is not captured. Instead it throws:
# "concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."
loop = asyncio.get_running_loop()
return await loop.run_in_executor(process_pool_executor, partial(handler, **arguments))
return await handler(**arguments)
except BaseException as e:
if activity.in_activity():
activity.logger.error(f"Error in execute_system_call: {e}")
Expand Down

0 comments on commit c567cdc

Please sign in to comment.