Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions openai_agents/memory/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Session Memory Examples

Session memory examples for OpenAI Agents SDK integrated with Temporal workflows.

*Adapted from [OpenAI Agents SDK session memory examples](https://github.com/openai/openai-agents-python/tree/main/examples/memory)*

Before running these examples, be sure to review the [prerequisites and background on the integration](../README.md).

## Running the Examples

### PostgreSQL Session Memory

This example uses a PostgreSQL database to store session data.

You need can use the standard PostgreSQL environment variables to configure the database connection.
These include `PGDATABASE`, `PGUSER`, `PGPASSWORD`, `PGHOST`, and `PGPORT`.
We also support the `DATABASE_URL` environment variable.

To confirm that your environment is configured correctly, just run the `psql` command after setting the environment variables.
For example:
```bash
PGDATABASE=postgres psql
```

Start the worker:
```bash
PGDATABASE=postgres uv run openai_agents/memory/run_postgres_session_worker.py
```

Then run the workflow:

```bash
uv run openai_agents/memory/run_postgres_session_workflow.py
```
84 changes: 84 additions & 0 deletions openai_agents/memory/db_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import json
import asyncpg
from typing import Callable, Awaitable, TypeVar
from temporalio import activity
from pydantic import BaseModel

T = TypeVar("T")


class IdempotenceHelper(BaseModel):
table_name: str

def __init__(self, table_name: str):
super().__init__(table_name=table_name)
self.table_name = table_name

async def create_table(self, conn: asyncpg.Connection) -> None:
await conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
run_id UUID NOT NULL,
activity_id TEXT NOT NULL,
operation_started_at TIMESTAMP NOT NULL,
operation_completed_at TIMESTAMP NULL,
operation_result TEXT NULL,
PRIMARY KEY (run_id, activity_id)
)
"""
)

async def idempotent_update(
self,
conn: asyncpg.Connection,
operation: Callable[[asyncpg.Connection], Awaitable[T]],
) -> T | None:
"""Insert idempotence row; on conflict, read and return existing result.

The operation must be an async callable of the form:
async def op(conn: asyncpg.Connection) -> T
"""
activity_info = activity.info()
run_id = activity_info.workflow_run_id
activity_id = activity_info.activity_id

async with conn.transaction():
did_insert = await conn.fetchrow(
(
f"INSERT INTO {self.table_name} "
f"(run_id, activity_id, operation_started_at) "
f"VALUES ($1, $2, NOW()) "
f"ON CONFLICT (run_id, activity_id) DO NOTHING "
f"RETURNING 1"
),
run_id,
activity_id,
)

if did_insert:
res = await operation(conn)

if hasattr(res, "model_dump_json"):
op_result = res.model_dump_json()
else:
op_result = json.dumps(res)

await conn.execute(
f"UPDATE {self.table_name} SET operation_completed_at = NOW(), operation_result = $1 WHERE run_id = $2 AND activity_id = $3",
op_result,
run_id,
activity_id,
)
return res
else:
row = await conn.fetchrow(
f"SELECT operation_result FROM {self.table_name} WHERE run_id = $1 AND activity_id = $2",
run_id,
activity_id,
)
if not row or row["operation_result"] is None:
return None
try:
return json.loads(row["operation_result"])
except Exception:
return row["operation_result"]
Loading
Loading