Skip to content

Commit

Permalink
Merge pull request #989 from julep-ai/f/migrate-workflows-to-pg
Browse files Browse the repository at this point in the history
feat(agents-api): migrate workflows to pg
  • Loading branch information
Ahmad-mtos authored Dec 25, 2024
2 parents 857bb96 + c0acb49 commit 3caf682
Show file tree
Hide file tree
Showing 96 changed files with 215 additions and 7,343 deletions.
2 changes: 1 addition & 1 deletion agents-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ COPY . ./
ENV PYTHONUNBUFFERED=1
ENV GUNICORN_CMD_ARGS="--capture-output --enable-stdio-inheritance"

ENTRYPOINT ["uv", "run", "gunicorn", "agents_api.web:app", "-c", "gunicorn_conf.py"]
ENTRYPOINT ["uv", "run", "--offline", "--no-sync", "gunicorn", "agents_api.web:app", "-c", "gunicorn_conf.py"]
2 changes: 1 addition & 1 deletion agents-api/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ COPY . ./
ENV PYTHONUNBUFFERED=1
ENV GUNICORN_CMD_ARGS="--capture-output --enable-stdio-inheritance"

ENTRYPOINT ["uv", "run", "python", "-m", "agents_api.worker"]
ENTRYPOINT ["uv", "run", "--offline", "--no-sync", "python", "-m", "agents_api.worker"]
2 changes: 2 additions & 0 deletions agents-api/agents_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@

with workflow.unsafe.imports_passed_through():
import msgpack as msgpack

import os
73 changes: 0 additions & 73 deletions agents-api/agents_api/activities/embed_docs.py

This file was deleted.

2 changes: 1 addition & 1 deletion agents-api/agents_api/activities/execute_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..common.exceptions.tools import IntegrationExecutionException
from ..common.protocol.tasks import ExecutionInput, StepContext
from ..env import testing
from ..models.tools import get_tool_args_from_metadata
from ..queries.tools import get_tool_args_from_metadata


@beartype
Expand Down
3 changes: 1 addition & 2 deletions agents-api/agents_api/activities/task_steps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# ruff: noqa: F401, F403, F405

from .base_evaluate import base_evaluate

# from .cozo_query_step import cozo_query_step
from .evaluate_step import evaluate_step
from .for_each_step import for_each_step
from .get_value_step import get_value_step
from .if_else_step import if_else_step
from .log_step import log_step
from .map_reduce_step import map_reduce_step
from .pg_query_step import pg_query_step
from .prompt_step import prompt_step
from .raise_complete_async import raise_complete_async
from .return_step import return_step
Expand Down
28 changes: 0 additions & 28 deletions agents-api/agents_api/activities/task_steps/cozo_query_step.py

This file was deleted.

38 changes: 38 additions & 0 deletions agents-api/agents_api/activities/task_steps/pg_query_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any

from async_lru import alru_cache
from beartype import beartype
from temporalio import activity

from ... import queries
from ...clients.pg import create_db_pool
from ...env import pg_dsn, testing


@alru_cache(maxsize=1)
async def get_db_pool(dsn: str):
return await create_db_pool(dsn=dsn)


@beartype
async def pg_query_step(
query_name: str,
values: dict[str, Any],
dsn: str = pg_dsn,
) -> Any:
pool = await get_db_pool(dsn=dsn)

(module_name, name) = query_name.split(".")

module = getattr(queries, module_name)
query = getattr(module, name)
return await query(**values, connection_pool=pool)


# Note: This is here just for clarity. We could have just imported pg_query_step directly
# They do the same thing, so we dont need to mock the pg_query_step function
mock_pg_query_step = pg_query_step

pg_query_step = activity.defn(name="pg_query_step")(
pg_query_step if not testing else mock_pg_query_step
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from ...exceptions import LastErrorInput, TooManyRequestsError
from ...queries.executions.create_execution_transition import (
create_execution_transition_async,
create_execution_transition,
)
from ..utils import RateLimiter

Expand Down Expand Up @@ -52,7 +52,7 @@ async def transition_step(

# Create transition
try:
transition = await create_execution_transition_async(
transition = await create_execution_transition(
developer_id=context.execution_input.developer_id,
execution_id=context.execution_input.execution.id,
task_id=context.execution_input.task.id,
Expand Down
44 changes: 22 additions & 22 deletions agents-api/agents_api/activities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,28 +296,28 @@ def get_handler(system: SystemDef) -> Callable:
The base handler function.
"""

from ..models.agent.create_agent import create_agent as create_agent_query
from ..models.agent.delete_agent import delete_agent as delete_agent_query
from ..models.agent.get_agent import get_agent as get_agent_query
from ..models.agent.list_agents import list_agents as list_agents_query
from ..models.agent.update_agent import update_agent as update_agent_query
from ..models.docs.delete_doc import delete_doc as delete_doc_query
from ..models.docs.list_docs import list_docs as list_docs_query
from ..models.session.create_session import create_session as create_session_query
from ..models.session.delete_session import delete_session as delete_session_query
from ..models.session.get_session import get_session as get_session_query
from ..models.session.list_sessions import list_sessions as list_sessions_query
from ..models.session.update_session import update_session as update_session_query
from ..models.task.create_task import create_task as create_task_query
from ..models.task.delete_task import delete_task as delete_task_query
from ..models.task.get_task import get_task as get_task_query
from ..models.task.list_tasks import list_tasks as list_tasks_query
from ..models.task.update_task import update_task as update_task_query
from ..models.user.create_user import create_user as create_user_query
from ..models.user.delete_user import delete_user as delete_user_query
from ..models.user.get_user import get_user as get_user_query
from ..models.user.list_users import list_users as list_users_query
from ..models.user.update_user import update_user as update_user_query
from ..queries.agents.create_agent import create_agent as create_agent_query
from ..queries.agents.delete_agent import delete_agent as delete_agent_query
from ..queries.agents.get_agent import get_agent as get_agent_query
from ..queries.agents.list_agents import list_agents as list_agents_query
from ..queries.agents.update_agent import update_agent as update_agent_query
from ..queries.docs.delete_doc import delete_doc as delete_doc_query
from ..queries.docs.list_docs import list_docs as list_docs_query
from ..queries.sessions.create_session import create_session as create_session_query
from ..queries.sessions.delete_session import delete_session as delete_session_query
from ..queries.sessions.get_session import get_session as get_session_query
from ..queries.sessions.list_sessions import list_sessions as list_sessions_query
from ..queries.sessions.update_session import update_session as update_session_query
from ..queries.tasks.create_task import create_task as create_task_query
from ..queries.tasks.delete_task import delete_task as delete_task_query
from ..queries.tasks.get_task import get_task as get_task_query
from ..queries.tasks.list_tasks import list_tasks as list_tasks_query
from ..queries.tasks.update_task import update_task as update_task_query
from ..queries.users.create_user import create_user as create_user_query
from ..queries.users.delete_user import delete_user as delete_user_query
from ..queries.users.get_user import get_user as get_user_query
from ..queries.users.list_users import list_users as list_users_query
from ..queries.users.update_user import update_user as update_user_query
from ..routers.docs.create_doc import create_agent_doc, create_user_doc
from ..routers.docs.search_docs import search_agent_docs, search_user_docs
from ..routers.sessions.chat import chat
Expand Down
30 changes: 16 additions & 14 deletions agents-api/agents_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
@asynccontextmanager
async def lifespan(app: FastAPI):
# INIT POSTGRES #
db_dsn = os.environ.get("DB_DSN")
pg_dsn = os.environ.get("PG_DSN")

if not getattr(app.state, "postgres_pool", None):
app.state.postgres_pool = await create_db_pool(db_dsn)
app.state.postgres_pool = await create_db_pool(pg_dsn)

# INIT S3 #
s3_access_key = os.environ.get("S3_ACCESS_KEY")
Expand Down Expand Up @@ -67,7 +67,8 @@ async def lifespan(app: FastAPI):
lifespan=lifespan,
#
# Global dependencies
dependencies=[Depends(valid_content_length)],
# FIXME: This is blocking access to scalar
# dependencies=[Depends(valid_content_length)],
)

# Enable metrics
Expand All @@ -92,19 +93,20 @@ async def scalar_html():


# content-length validation
# FIXME: This is blocking access to scalar
# NOTE: This relies on client reporting the correct content-length header
# TODO: We should use streaming for large payloads
@app.middleware("http")
async def validate_content_length(
request: Request,
call_next: Callable[[Request], Coroutine[Any, Any, Response]],
):
content_length = request.headers.get("content-length")
# @app.middleware("http")
# async def validate_content_length(
# request: Request,
# call_next: Callable[[Request], Coroutine[Any, Any, Response]],
# ):
# content_length = request.headers.get("content-length")

if not content_length:
return Response(status_code=411, content="Content-Length header is required")
# if not content_length:
# return Response(status_code=411, content="Content-Length header is required")

if int(content_length) > max_payload_size:
return Response(status_code=413, content="Payload too large")
# if int(content_length) > max_payload_size:
# return Response(status_code=413, content="Payload too large")

return await call_next(request)
# return await call_next(request)
2 changes: 1 addition & 1 deletion agents-api/agents_api/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
The `clients` module contains client classes and functions for interacting with various external services and APIs, abstracting the complexity of HTTP requests and API interactions to provide a simplified interface for the rest of the application.
- `cozo.py`: Handles communication with the Cozo service, facilitating operations such as retrieving product information.
- `pg.py`: Handles communication with the PostgreSQL service, facilitating operations such as retrieving product information.
- `embed.py`: Manages requests to an Embedding Service for text embedding functionalities.
- `openai.py`: Facilitates interaction with OpenAI's API for natural language processing tasks.
- `temporal.py`: Provides functionality for connecting to Temporal workflows, enabling asynchronous task execution and management.
Expand Down
4 changes: 2 additions & 2 deletions agents-api/agents_api/clients/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncpg

from ..env import db_dsn
from ..env import pg_dsn


async def _init_conn(conn):
Expand All @@ -16,5 +16,5 @@ async def _init_conn(conn):

async def create_db_pool(dsn: str | None = None):
return await asyncpg.create_pool(
dsn if dsn is not None else db_dsn, init=_init_conn
dsn if dsn is not None else pg_dsn, init=_init_conn
)
2 changes: 1 addition & 1 deletion agents-api/agents_api/common/protocol/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class PartialTransition(create_partial_model(CreateTransitionRequest)):

class ExecutionInput(BaseModel):
developer_id: UUID
execution: Execution
execution: Execution | None = None
task: TaskSpecDef
agent: Agent
agent_tools: list[Tool | CreateToolRequest]
Expand Down
2 changes: 1 addition & 1 deletion agents-api/agents_api/common/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
The `utils` module within the `agents-api` project offers a collection of utility functions designed to support various aspects of the application. This includes:
- `cozo.py`: Utilities for interacting with the Cozo API client, including data mutation processes.
- `pg.py`: Utilities for interacting with the PostgreSQL API client, including data mutation processes.
- `datetime.py`: Functions for handling date and time operations, ensuring consistent use of time zones and formats across the application.
- `json.py`: Custom JSON utilities, including a custom JSON encoder for handling specific object types like UUIDs, and a utility function for JSON serialization with support for default values for None objects.
Expand Down
26 changes: 0 additions & 26 deletions agents-api/agents_api/common/utils/cozo.py

This file was deleted.

Loading

0 comments on commit 3caf682

Please sign in to comment.