diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d06999db34c7..597e89df7b6e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -68,6 +68,10 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos + pip install pytest-asyncio - name: Install packages and dependencies run: | python -m pip install --upgrade pip wheel diff --git a/.github/workflows/contrib-openai.yml b/.github/workflows/contrib-openai.yml index 1bf71115d6b4..9c4675ca5410 100644 --- a/.github/workflows/contrib-openai.yml +++ b/.github/workflows/contrib-openai.yml @@ -50,6 +50,10 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos + pip install pytest-asyncio - name: Install packages and dependencies run: | docker --version diff --git a/.github/workflows/contrib-tests.yml b/.github/workflows/contrib-tests.yml index f8dd1d461865..40de5326fa29 100644 --- a/.github/workflows/contrib-tests.yml +++ b/.github/workflows/contrib-tests.yml @@ -42,6 +42,10 @@ jobs: run: | python -m pip install --upgrade pip wheel pip install pytest-cov>=5 + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos + pip install pytest-asyncio - name: Install qdrant_client when python-version is 3.10 if: matrix.python-version == '3.10' run: | @@ -91,6 +95,9 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel @@ -138,6 +145,9 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel @@ -173,6 +183,9 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel @@ -208,6 +221,9 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel @@ -247,6 +263,9 @@ jobs: run: | python -m pip install --upgrade pip wheel pip install pytest-cov>=5 + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for WebSurfer run: | pip install -e .[websurfer] @@ -280,6 +299,9 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel @@ -359,6 +381,9 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel @@ -394,6 +419,9 @@ jobs: uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel diff --git a/.github/workflows/samples-tools-tests.yml b/.github/workflows/samples-tools-tests.yml index e774e5cb0b1f..2a10aa2ea967 100644 --- a/.github/workflows/samples-tools-tests.yml +++ b/.github/workflows/samples-tools-tests.yml @@ -33,6 +33,10 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Explicitly install packages for Azure Cosmos DB + run: | + pip install azure-cosmos + pip install pytest-asyncio - name: Install packages and dependencies for all tests run: | python -m pip install --upgrade pip wheel diff --git a/autogen/logger/__init__.py b/autogen/logger/__init__.py index 6561cab4360f..97e2f4a1e4a3 100644 --- a/autogen/logger/__init__.py +++ b/autogen/logger/__init__.py @@ -1,4 +1,5 @@ +from .cosmos_db_logger import CosmosDBLogger from .logger_factory import LoggerFactory from .sqlite_logger import SqliteLogger -__all__ = ("LoggerFactory", "SqliteLogger") +__all__ = ("LoggerFactory", "SqliteLogger", "CosmosDBLogger") diff --git a/autogen/logger/cosmos_db_logger.py b/autogen/logger/cosmos_db_logger.py new file mode 100644 index 000000000000..04c456a33d37 --- /dev/null +++ b/autogen/logger/cosmos_db_logger.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import logging +import queue +import threading +import uuid +from typing import TYPE_CHECKING, Any, Dict, Optional, TypedDict, Union + +from azure.cosmos import CosmosClient, exceptions +from azure.cosmos.exceptions import CosmosHttpResponseError +from openai import AzureOpenAI, OpenAI +from openai.types.chat import ChatCompletion + +from autogen.logger.base_logger import BaseLogger +from autogen.logger.logger_utils import get_current_ts, to_dict + +if TYPE_CHECKING: + from autogen import Agent, ConversableAgent, OpenAIWrapper + +__all__ = ("CosmosDBLogger",) + +logger = logging.getLogger(__name__) + + +class CosmosDBLoggerConfig(TypedDict, total=False): + connection_string: str + database_id: str + container_id: str + + +class CosmosDBLogger(BaseLogger): + + log_queue: queue.Queue[Optional[Dict[str, Any]]] = queue.Queue() + + def __init__(self, config: CosmosDBLoggerConfig): + required_keys = ["connection_string", "database_id", "container_id"] + if not all(key in config for key in required_keys): + raise ValueError("Missing required configuration for Cosmos DB Logger") + + self.config = config + self.client = CosmosClient.from_connection_string(config["connection_string"]) + self.database_id = config.get("database_id", "autogen_logging") + self.database = self.client.get_database_client(self.database_id) + self.container_id = config.get("container_id", "Logs") + self.container = self.database.get_container_client(self.container_id) + self.session_id = str(uuid.uuid4()) + self.log_queue = queue.Queue() + self.logger_thread = threading.Thread(target=self._worker, daemon=True) + self.logger_thread.start() + + def start(self) -> str: + try: + self.database.create_container_if_not_exists(id=self.container_id, partition_key="/session_id") + except exceptions.CosmosHttpResponseError as e: + logger.error(f"Failed to create or access container {self.container_id}: {e}") + return self.session_id + + def _worker(self) -> None: + while True: + item = self.log_queue.get() + if item is None: # None is a signal to stop the worker thread + self.log_queue.task_done() + break + try: + self._process_log_entry(item) + except Exception as e: + logger.error(f"Error processing log entry: {e}") + finally: + self.log_queue.task_done() + + def _process_log_entry(self, document: Dict[str, Any]) -> None: + try: + self.container.upsert_item(document) + except exceptions.CosmosHttpResponseError as e: + logger.error(f"Failed to upsert document: {e}") + except Exception as e: + logger.error(f"Unexpected error during upsert: {str(e)}") + + def log_chat_completion( + self, + invocation_id: uuid.UUID, + client_id: int, + wrapper_id: int, + request: Dict[str, Any], + response: Union[str, ChatCompletion], + is_cached: int, + cost: float, + start_time: str, + ) -> None: + # Debugging: Print when the method is called and the parameters it received + print("log_chat_completion called") + print("Invocation ID:", str(invocation_id)) + print("Client ID:", client_id) + print("Wrapper ID:", wrapper_id) + print("Request:", request) + print("Response:", response) + print("Is Cached:", is_cached) + print("Cost:", cost) + print("Start Time:", start_time) + # End debugging + document = { + "type": "chat_completion", + "invocation_id": str(invocation_id), + "client_id": client_id, + "wrapper_id": wrapper_id, + "session_id": self.session_id, + "request": to_dict(request), + "response": to_dict(response), + "is_cached": is_cached, + "cost": cost, + "start_time": start_time, + "end_time": get_current_ts(), + } + # Debugging: Print the document to be added to the queue + print("Document prepared for queue:", document) + # End debugging + + self.log_queue.put(document) + + # Debugging: Confirm the document has been added to the queue + print("Document added to queue") + # End debugging + + def log_event(self, source: Union[str, Agent], name: str, **kwargs: Dict[str, Any]) -> None: + document = { + "type": "event", + "session_id": self.session_id, + "event_name": name, + "timestamp": get_current_ts(), + "details": to_dict(kwargs), + } + + if isinstance(source, Agent): + document.update( + { + "source_id": id(source), + "source_name": source.name if hasattr(source, "name") else str(source), + "source_type": source.__class__.__name__, + "agent_module": source.__module__, + } + ) + else: + document.update( + { + "source_id": id(source), + "source_name": str(source), + "source_type": "System", + } + ) + + self.log_queue.put(document) + + def log_new_agent(self, agent: ConversableAgent, init_args: Dict[str, Any]) -> None: + document = { + "type": "new_agent", + "session_id": self.session_id, + "agent_id": id(agent), + "agent_name": agent.name, + "init_args": to_dict(init_args), + "timestamp": get_current_ts(), + } + self.container.upsert_item(document) + + def log_new_wrapper(self, wrapper: OpenAIWrapper, init_args: Dict[str, Any]) -> None: + document = { + "type": "new_wrapper", + "session_id": self.session_id, + "wrapper_id": id(wrapper), + "init_args": to_dict(init_args), + "timestamp": get_current_ts(), + } + self.log_queue.put(document) + + def log_new_client(self, client: Any, wrapper: OpenAIWrapper, init_args: Dict[str, Any]) -> None: + document = { + "type": "new_client", + "session_id": self.session_id, + "client_id": id(client), + "wrapper_id": id(wrapper), + "client_class": type(client).__name__, + "init_args": to_dict(init_args), + "timestamp": get_current_ts(), + } + self.log_queue.put(document) + + def stop(self) -> None: + self.log_queue.put(None) # Signal to stop the worker thread + self.logger_thread.join() # Wait for the worker thread to finish + #if self.client: + #self.client.close() # Explicitly close the Cosmos client + + def get_connection(self) -> None: + # Cosmos DB connection management is handled by the SDK. + return None diff --git a/autogen/logger/logger_factory.py b/autogen/logger/logger_factory.py index 8073c0c07d3e..897db691269d 100644 --- a/autogen/logger/logger_factory.py +++ b/autogen/logger/logger_factory.py @@ -3,6 +3,13 @@ from autogen.logger.base_logger import BaseLogger from autogen.logger.sqlite_logger import SqliteLogger +try: + from autogen.logger.cosmos_db_logger import CosmosDBLogger, CosmosDBLoggerConfig + + cosmos_imported = True +except ImportError: + cosmos_imported = False + __all__ = ("LoggerFactory",) @@ -14,5 +21,23 @@ def get_logger(logger_type: str = "sqlite", config: Optional[Dict[str, Any]] = N if logger_type == "sqlite": return SqliteLogger(config) + elif logger_type == "cosmos": + if not cosmos_imported: + raise ImportError( + "CosmosDBLogger and CosmosDBLoggerConfig could not be imported. Please ensure the cosmos package is installed by using pip install pyautogen[cosmosdb]." + ) + # Validate configuration for CosmosDBLogger + required_keys = {"connection_string", "database_id", "container_id"} + if isinstance(config, dict) and required_keys.issubset(config.keys()): + cosmos_config: CosmosDBLoggerConfig = { + "connection_string": config["connection_string"], + "database_id": config["database_id"], + "container_id": config["container_id"], + } + return CosmosDBLogger(cosmos_config) # Config validated and passed as CosmosDBLoggerConfig + else: + raise ValueError( + "Provided configuration is missing required keys or is not properly formatted for CosmosDBLogger." + ) else: raise ValueError(f"[logger_factory] Unknown logger type: {logger_type}") diff --git a/autogen/runtime_logging.py b/autogen/runtime_logging.py index 1b9835eaa4b0..f2d2113792a8 100644 --- a/autogen/runtime_logging.py +++ b/autogen/runtime_logging.py @@ -49,9 +49,16 @@ def log_chat_completion( logger.error("[runtime logging] log_chat_completion: autogen logger is None") return - autogen_logger.log_chat_completion( - invocation_id, client_id, wrapper_id, request, response, is_cached, cost, start_time - ) + # Debug output + logger.debug(f"Logging chat completion for invocation_id: {invocation_id}, client_id: {client_id}, wrapper_id: {wrapper_id}") + + try: + autogen_logger.log_chat_completion( + invocation_id, client_id, wrapper_id, request, response, is_cached, cost, start_time + ) + logger.debug(f"Successfully logged chat completion for invocation_id: {invocation_id}") + except Exception as e: + logger.error(f"Error while logging chat completion: {e}", exc_info=True) def log_new_agent(agent: ConversableAgent, init_args: Dict[str, Any]) -> None: @@ -88,7 +95,7 @@ def log_new_client(client: Union[AzureOpenAI, OpenAI], wrapper: OpenAIWrapper, i def stop() -> None: global is_logging - if autogen_logger: + if autogen_logger is not None: autogen_logger.stop() is_logging = False diff --git a/notebook/agentchat_logging.ipynb b/notebook/agentchat_logging.ipynb index 2ad19e7995a5..d26e61a99788 100644 --- a/notebook/agentchat_logging.ipynb +++ b/notebook/agentchat_logging.ipynb @@ -43,17 +43,28 @@ "\n", "import autogen\n", "from autogen import AssistantAgent, UserProxyAgent\n", + "from autogen.logger.logger_factory import LoggerFactory\n", "\n", "# Setup API key. Add your own API key to config file or environment variable\n", "llm_config = {\n", - " \"config_list\": autogen.config_list_from_json(\n", - " env_or_file=\"OAI_CONFIG_LIST\",\n", - " ),\n", + " \"config_list\": autogen.config_list_from_json(env_or_file=\"OAI_CONFIG_LIST\"),\n", " \"temperature\": 0.9,\n", "}\n", "\n", + "# Choose logger type based on environment or user input\n", + "logger_type = \"sqlite\" # or 'cosmos', switch based on user requirement\n", + "if logger_type == \"cosmos\":\n", + " logger_config = {\n", + " \"connection_string\": \"Your_Cosmos_DB_Connection_String\",\n", + " \"database_id\": \"Your_Database_ID\",\n", + " \"container_id\": \"Your_Container_ID\",\n", + " }\n", + "else:\n", + " logger_config = {\"dbname\": \"logs.db\"}\n", + "\n", "# Start logging\n", - "logging_session_id = autogen.runtime_logging.start(config={\"dbname\": \"logs.db\"})\n", + "logger = LoggerFactory.get_logger(logger_type, logger_config)\n", + "logging_session_id = logger.start()\n", "print(\"Logging session ID: \" + str(logging_session_id))\n", "\n", "# Create an agent workflow and run it\n", @@ -75,9 +86,10 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Getting Data from the SQLite Database \n", + "## Getting Data from the Database \n", "\n", - "`logs.db` should be generated, by default it's using SQLite database. You can view the data with GUI tool like `sqlitebrowser`, using SQLite command line shell or using python script:\n", + "Depending on the chosen logger, data retrieval methods will differ.\n", + "`logs.db` should be generated if using SQLite database. You can view the data with GUI tool like `sqlitebrowser`, using SQLite command line shell or using python script:\n", "\n" ] }, @@ -87,17 +99,38 @@ "metadata": {}, "outputs": [], "source": [ - "def get_log(dbname=\"logs.db\", table=\"chat_completions\"):\n", + "# If using SQLite\n", + "def get_sqlite_log(dbname=\"logs.db\", table=\"chat_completions\"):\n", " import sqlite3\n", "\n", " con = sqlite3.connect(dbname)\n", - " query = f\"SELECT * from {table}\"\n", + " query = f\"SELECT * FROM {table}\"\n", " cursor = con.execute(query)\n", " rows = cursor.fetchall()\n", " column_names = [description[0] for description in cursor.description]\n", " data = [dict(zip(column_names, row)) for row in rows]\n", " con.close()\n", - " return data" + " return data\n", + "\n", + "\n", + "log_data = get_sqlite_log()\n", + "log_data_df = pd.DataFrame(log_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# If using Azure Cosmos DB\n", + "def get_cosmos_log(container, query=\"SELECT * FROM c\"):\n", + " items = list(container.query_items(query=query, enable_cross_partition_query=True))\n", + " return items\n", + "\n", + "\n", + "log_data = get_cosmos_log(logger.get_connection())\n", + "log_data_df = pd.DataFrame(log_data)" ] }, { @@ -227,9 +260,6 @@ " return json.loads(s)\n", "\n", "\n", - "log_data = get_log()\n", - "log_data_df = pd.DataFrame(log_data)\n", - "\n", "log_data_df[\"total_tokens\"] = log_data_df.apply(\n", " lambda row: str_to_dict(row[\"response\"])[\"usage\"][\"total_tokens\"], axis=1\n", ")\n", @@ -249,7 +279,7 @@ "source": [ "## Computing Cost \n", "\n", - "One use case of logging data is to compute the cost of a session." + "One use case of logging data is to compute the cost of a session. The code below is only for SQLite, adjust if using for Azure Cosmos DB." ] }, { diff --git a/test/test_logging_cosmos_db.py b/test/test_logging_cosmos_db.py new file mode 100644 index 000000000000..4232728419f4 --- /dev/null +++ b/test/test_logging_cosmos_db.py @@ -0,0 +1,76 @@ +import json +import uuid +from unittest.mock import Mock, patch + +import pytest +from openai import AzureOpenAI + +import autogen.logger.logger_factory as logger_factory +from autogen.logger.logger_utils import get_current_ts, to_dict +from autogen.runtime_logging import log_chat_completion, start, stop + +# Sample data for testing +SAMPLE_CHAT_REQUEST = json.loads( + """ +{ + "messages": [ + { + "content": "Can you explain the difference between eigenvalues and singular values again?", + "role": "assistant" + } + ], + "model": "gpt-4" +} +""" +) + +SAMPLE_CHAT_RESPONSE = json.loads( + """ +{ + "id": "chatcmpl-8k57oSg1fz2JwpMcEOWMqUvwjf0cb", + "choices": [ + { + "finish_reason": "stop", + "index": 0, + "message": { + "content": "Eigenvalues are...", + "role": "assistant" + } + } + ], + "model": "gpt-4" +} +""" +) + +@pytest.fixture(scope="function") +def cosmos_db_setup(): + autogen_logger = Mock() + autogen_logger.log_queue.put = Mock() + + config = { + "connection_string": "AccountEndpoint=https://example.documents.azure.com:443/;AccountKey=dGVzdA==", + "database_id": "TestDatabase", + "container_id": "TestContainer", + } + + # Patch the get_logger method of the LoggerFactory object + with patch.object(LoggerFactory, 'get_logger', return_value=autogen_logger): + start(logger_type="cosmos", config=config) + yield autogen_logger + stop() + +class TestCosmosDBLogging: + def get_sample_chat_completion(self, response): + return { + "invocation_id": str(uuid.uuid4()), + "client_id": 140609438577184, + "wrapper_id": 140610167717744, + "request": SAMPLE_CHAT_REQUEST, + "response": response, + "is_cached": 0, + "cost": 0.347, + "start_time": get_current_ts(), + } + + diff --git a/test/test_logging.py b/test/test_logging_sqlite.py similarity index 100% rename from test/test_logging.py rename to test/test_logging_sqlite.py