diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f6df28088f51..2abbc6bd68ec 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,8 +42,13 @@ jobs: pip install -e . python -c "import autogen" pip install pytest mock + - name: Install optional dependencies for code executors + # code executors auto skip without deps, so only run for python 3.11 + if: matrix.python-version == '3.11' + run: | pip install jupyter-client ipykernel python -m ipykernel install --user --name python3 + pip install -e ".[local-jupyter-exec]" - name: Set AUTOGEN_USE_DOCKER based on OS shell: bash run: | diff --git a/autogen/coding/base.py b/autogen/coding/base.py index 936d3ee25366..0bd373157c9f 100644 --- a/autogen/coding/base.py +++ b/autogen/coding/base.py @@ -92,3 +92,12 @@ def restart(self) -> None: This method is called when the agent is reset. """ ... # pragma: no cover + + +class IPythonCodeResult(CodeResult): + """(Experimental) A code result class for IPython code executor.""" + + output_files: List[str] = Field( + default_factory=list, + description="The list of files that the executed code blocks generated.", + ) diff --git a/autogen/coding/embedded_ipython_code_executor.py b/autogen/coding/embedded_ipython_code_executor.py index c85798f75039..a83dab23327a 100644 --- a/autogen/coding/embedded_ipython_code_executor.py +++ b/autogen/coding/embedded_ipython_code_executor.py @@ -1,6 +1,7 @@ import base64 import json import os +from pathlib import Path import re import uuid from queue import Empty @@ -11,19 +12,10 @@ from pydantic import BaseModel, Field, field_validator from ..agentchat.agent import LLMAgent -from .base import CodeBlock, CodeExtractor, CodeResult +from .base import CodeBlock, CodeExtractor, IPythonCodeResult from .markdown_code_extractor import MarkdownCodeExtractor -__all__ = ("EmbeddedIPythonCodeExecutor", "IPythonCodeResult") - - -class IPythonCodeResult(CodeResult): - """(Experimental) A code result class for IPython code executor.""" - - output_files: List[str] = Field( - default_factory=list, - description="The list of files that the executed code blocks generated.", - ) +__all__ = "EmbeddedIPythonCodeExecutor" class EmbeddedIPythonCodeExecutor(BaseModel): @@ -126,6 +118,8 @@ def __init__(self, **kwargs: Any): self._kernel_client = self._kernel_manager.client() self._kernel_client.start_channels() self._timeout = self.timeout + self._kernel_name = self.kernel_name + self._output_dir = Path(self.output_dir) @property def user_capability(self) -> "EmbeddedIPythonCodeExecutor.UserCapability": diff --git a/autogen/coding/factory.py b/autogen/coding/factory.py index 953de5906dd1..ceb01ca3dfaa 100644 --- a/autogen/coding/factory.py +++ b/autogen/coding/factory.py @@ -37,5 +37,9 @@ def create(code_execution_config: Dict[str, Any]) -> CodeExecutor: from .local_commandline_code_executor import LocalCommandlineCodeExecutor return LocalCommandlineCodeExecutor(**code_execution_config.get("commandline-local", {})) + elif executor == "jupyter-local": + from .jupyter_code_executor import LocalJupyterCodeExecutor + + return LocalJupyterCodeExecutor(**code_execution_config.get("jupyter-local", {})) else: raise ValueError(f"Unknown code executor {executor}") diff --git a/autogen/coding/jupyter/__init__.py b/autogen/coding/jupyter/__init__.py new file mode 100644 index 000000000000..96c8cf4a65cc --- /dev/null +++ b/autogen/coding/jupyter/__init__.py @@ -0,0 +1,5 @@ +from .base import JupyterConnectable, JupyterConnectionInfo +from .jupyter_client import JupyterClient +from .local_jupyter_server import LocalJupyterServer + +__all__ = ["JupyterConnectable", "JupyterConnectionInfo", "JupyterClient", "LocalJupyterServer"] diff --git a/autogen/coding/jupyter/base.py b/autogen/coding/jupyter/base.py new file mode 100644 index 000000000000..8e86897249ef --- /dev/null +++ b/autogen/coding/jupyter/base.py @@ -0,0 +1,21 @@ +from dataclasses import dataclass +from typing import Optional, Protocol, runtime_checkable + + +@dataclass +class JupyterConnectionInfo: + """(Experimental)""" + + host: str + use_https: bool + port: int + token: Optional[str] + + +@runtime_checkable +class JupyterConnectable(Protocol): + """(Experimental)""" + + @property + def connection_info(self) -> JupyterConnectionInfo: + pass diff --git a/autogen/coding/jupyter/jupyter_client.py b/autogen/coding/jupyter/jupyter_client.py new file mode 100644 index 000000000000..edecc415cd17 --- /dev/null +++ b/autogen/coding/jupyter/jupyter_client.py @@ -0,0 +1,200 @@ +from __future__ import annotations + +from dataclasses import dataclass +from types import TracebackType +from typing import Any, Dict, List, Optional, cast +import sys + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + +import json +import uuid +import datetime +import requests + +import websocket +from websocket import WebSocket + +from .base import JupyterConnectionInfo + + +class JupyterClient: + """(Experimental) A client for communicating with a Jupyter gateway server.""" + + def __init__(self, connection_info: JupyterConnectionInfo): + self._connection_info = connection_info + + def _get_headers(self) -> Dict[str, str]: + if self._connection_info.token is None: + return {} + return {"Authorization": f"token {self._connection_info.token}"} + + def _get_api_base_url(self) -> str: + protocol = "https" if self._connection_info.use_https else "http" + return f"{protocol}://{self._connection_info.host}:{self._connection_info.port}" + + def _get_ws_base_url(self) -> str: + return f"ws://{self._connection_info.host}:{self._connection_info.port}" + + def list_kernel_specs(self) -> Dict[str, Dict[str, str]]: + response = requests.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers()) + return cast(Dict[str, Dict[str, str]], response.json()) + + def list_kernels(self) -> List[Dict[str, str]]: + response = requests.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers()) + return cast(List[Dict[str, str]], response.json()) + + def start_kernel(self, kernel_spec_name: str) -> str: + """Start a new kernel. + + Args: + kernel_spec_name (str): Name of the kernel spec to start + + Returns: + str: ID of the started kernel + """ + + response = requests.post( + f"{self._get_api_base_url()}/api/kernels", + headers=self._get_headers(), + json={"name": kernel_spec_name}, + ) + return cast(str, response.json()["id"]) + + def restart_kernel(self, kernel_id: str) -> None: + response = requests.post( + f"{self._get_api_base_url()}/api/kernels/{kernel_id}/restart", headers=self._get_headers() + ) + response.raise_for_status() + + def get_kernel_client(self, kernel_id: str) -> JupyterKernelClient: + ws_url = f"{self._get_ws_base_url()}/api/kernels/{kernel_id}/channels" + ws = websocket.create_connection(ws_url, header=self._get_headers()) + return JupyterKernelClient(ws) + + +class JupyterKernelClient: + """(Experimental) A client for communicating with a Jupyter kernel.""" + + @dataclass + class ExecutionResult: + @dataclass + class DataItem: + mime_type: str + data: str + + is_ok: bool + output: str + data_items: List[DataItem] + + def __init__(self, websocket: WebSocket): + self._session_id: str = uuid.uuid4().hex + self._websocket: WebSocket = websocket + + def __enter__(self) -> Self: + return self + + def __exit__( + self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: + self._websocket.close() + + def _send_message(self, *, content: Dict[str, Any], channel: str, message_type: str) -> str: + timestamp = datetime.datetime.now().isoformat() + message_id = uuid.uuid4().hex + message = { + "header": { + "username": "autogen", + "version": "5.0", + "session": self._session_id, + "msg_id": message_id, + "msg_type": message_type, + "date": timestamp, + }, + "parent_header": {}, + "channel": channel, + "content": content, + "metadata": {}, + "buffers": {}, + } + self._websocket.send_text(json.dumps(message)) + return message_id + + def _receive_message(self, timeout_seconds: Optional[float]) -> Optional[Dict[str, Any]]: + self._websocket.settimeout(timeout_seconds) + try: + data = self._websocket.recv() + if isinstance(data, bytes): + data = data.decode("utf-8") + return cast(Dict[str, Any], json.loads(data)) + except websocket.WebSocketTimeoutException: + return None + + def wait_for_ready(self, timeout_seconds: Optional[float] = None) -> bool: + message_id = self._send_message(content={}, channel="shell", message_type="kernel_info_request") + while True: + message = self._receive_message(timeout_seconds) + # This means we timed out with no new messages. + if message is None: + return False + if ( + message.get("parent_header", {}).get("msg_id") == message_id + and message["msg_type"] == "kernel_info_reply" + ): + return True + + def execute(self, code: str, timeout_seconds: Optional[float] = None) -> ExecutionResult: + message_id = self._send_message( + content={ + "code": code, + "silent": False, + "store_history": True, + "user_expressions": {}, + "allow_stdin": False, + "stop_on_error": True, + }, + channel="shell", + message_type="execute_request", + ) + + text_output = [] + data_output = [] + while True: + message = self._receive_message(timeout_seconds) + if message is None: + return JupyterKernelClient.ExecutionResult( + is_ok=False, output="ERROR: Timeout waiting for output from code block.", data_items=[] + ) + + # Ignore messages that are not for this execution. + if message.get("parent_header", {}).get("msg_id") != message_id: + continue + + msg_type = message["msg_type"] + content = message["content"] + if msg_type in ["execute_result", "display_data"]: + for data_type, data in content["data"].items(): + if data_type == "text/plain": + text_output.append(data) + elif data_type.startswith("image/") or data_type == "text/html": + data_output.append(self.ExecutionResult.DataItem(mime_type=data_type, data=data)) + else: + text_output.append(json.dumps(data)) + elif msg_type == "stream": + text_output.append(content["text"]) + elif msg_type == "error": + # Output is an error. + return JupyterKernelClient.ExecutionResult( + is_ok=False, + output=f"ERROR: {content['ename']}: {content['evalue']}\n{content['traceback']}", + data_items=[], + ) + if msg_type == "status" and content["execution_state"] == "idle": + break + + return JupyterKernelClient.ExecutionResult( + is_ok=True, output="\n".join([str(output) for output in text_output]), data_items=data_output + ) diff --git a/autogen/coding/jupyter/local_jupyter_server.py b/autogen/coding/jupyter/local_jupyter_server.py new file mode 100644 index 000000000000..decbb3f430ed --- /dev/null +++ b/autogen/coding/jupyter/local_jupyter_server.py @@ -0,0 +1,148 @@ +from __future__ import annotations +from types import TracebackType + +from typing import Optional, Union, cast +import subprocess +import signal +import sys +import json +import secrets +import socket +import atexit + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + +from .base import JupyterConnectable, JupyterConnectionInfo +from .jupyter_client import JupyterClient + + +def _get_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return cast(int, s.getsockname()[1]) + + +class LocalJupyterServer(JupyterConnectable): + class GenerateToken: + pass + + def __init__( + self, + ip: str = "127.0.0.1", + port: Optional[int] = None, + token: Union[str, GenerateToken] = GenerateToken(), + log_file: str = "jupyter_gateway.log", + log_level: str = "INFO", + log_max_bytes: int = 1048576, + log_backup_count: int = 3, + ): + # Remove as soon as https://github.com/jupyter-server/kernel_gateway/issues/398 is fixed + if sys.platform == "win32": + raise ValueError("LocalJupyterServer is not supported on Windows due to kernelgateway bug.") + + # Check Jupyter gateway server is installed + try: + subprocess.run( + [sys.executable, "-m", "jupyter", "kernelgateway", "--version"], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + except subprocess.CalledProcessError: + raise ValueError( + "Jupyter gateway server is not installed. Please install it with `pip install jupyter_kernel_gateway`." + ) + + self.ip = ip + if port is None: + port = _get_free_port() + self.port = port + + if isinstance(token, LocalJupyterServer.GenerateToken): + token = secrets.token_hex(32) + + self.token = token + logging_config = { + "handlers": { + "file": { + "class": "logging.handlers.RotatingFileHandler", + "level": log_level, + "maxBytes": log_max_bytes, + "backupCount": log_backup_count, + "filename": log_file, + } + }, + "loggers": {"KernelGatewayApp": {"level": log_level, "handlers": ["file", "console"]}}, + } + + # Run Jupyter gateway server with detached subprocess + args = [ + sys.executable, + "-m", + "jupyter", + "kernelgateway", + "--KernelGatewayApp.ip", + ip, + "--KernelGatewayApp.port", + str(port), + "--KernelGatewayApp.auth_token", + token, + "--JupyterApp.answer_yes", + "true", + "--JupyterApp.logging_config", + json.dumps(logging_config), + "--JupyterWebsocketPersonality.list_kernels", + "true", + ] + self._subprocess = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + # Satisfy mypy, we know this is not None because we passed PIPE + assert self._subprocess.stderr is not None + # Read stderr until we see "is available at" or the process has exited with an error + stderr = "" + while True: + result = self._subprocess.poll() + if result is not None: + stderr += self._subprocess.stderr.read() + print(f"token=[[[[{token}]]]]") + raise ValueError(f"Jupyter gateway server failed to start with exit code: {result}. stderr:\n{stderr}") + line = self._subprocess.stderr.readline() + stderr += line + if "is available at" in line: + break + + # Poll the subprocess to check if it is still running + result = self._subprocess.poll() + if result is not None: + raise ValueError( + f"Jupyter gateway server failed to start. Please check the logs ({log_file}) for more information." + ) + + atexit.register(self.stop) + + def stop(self) -> None: + if self._subprocess.poll() is None: + if sys.platform == "win32": + self._subprocess.send_signal(signal.CTRL_C_EVENT) + else: + self._subprocess.send_signal(signal.SIGINT) + self._subprocess.wait() + + @property + def connection_info(self) -> JupyterConnectionInfo: + return JupyterConnectionInfo(host=self.ip, use_https=False, port=self.port, token=self.token) + + def get_client(self) -> JupyterClient: + return JupyterClient(self.connection_info) + + def __enter__(self) -> Self: + return self + + def __exit__( + self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: + self.stop() diff --git a/autogen/coding/jupyter_code_executor.py b/autogen/coding/jupyter_code_executor.py new file mode 100644 index 000000000000..551aea18aeba --- /dev/null +++ b/autogen/coding/jupyter_code_executor.py @@ -0,0 +1,222 @@ +import base64 +import json +import os +from pathlib import Path +import re +import uuid +from typing import Any, ClassVar, List, Union + +from pydantic import Field + + +from ..agentchat.agent import LLMAgent +from .base import CodeBlock, CodeExecutor, CodeExtractor, CodeResult, IPythonCodeResult +from .markdown_code_extractor import MarkdownCodeExtractor +from .jupyter import JupyterConnectable, JupyterConnectionInfo, LocalJupyterServer, JupyterClient + +__all__ = ("JupyterCodeExecutor", "LocalJupyterCodeExecutor") + + +class JupyterCodeExecutor(CodeExecutor): + """(Experimental) A code executor class that executes code statefully using an embedded + IPython kernel managed by this class. + + **This will execute LLM generated code on the local machine.** + + Each execution is stateful and can access variables created from previous + executions in the same session. The kernel must be installed before using + this class. The kernel can be installed using the following command: + `python -m ipykernel install --user --name {kernel_name}` + where `kernel_name` is the name of the kernel to install. + + Args: + timeout (int): The timeout for code execution, by default 60. + kernel_name (str): The kernel name to use. Make sure it is installed. + By default, it is "python3". + output_dir (str): The directory to save output files, by default ".". + system_message_update (str): The system message update to add to the + agent that produces code. By default it is + `JupyterCodeExecutor.DEFAULT_SYSTEM_MESSAGE_UPDATE`. + """ + + DEFAULT_SYSTEM_MESSAGE_UPDATE: ClassVar[ + str + ] = """ +# IPython Coding Capability +You have been given coding capability to solve tasks using Python code in a stateful IPython kernel. +You are responsible for writing the code, and the user is responsible for executing the code. + +When you write Python code, put the code in a markdown code block with the language set to Python. +For example: +```python +x = 3 +``` +You can use the variable `x` in subsequent code blocks. +```python +print(x) +``` + +Write code incrementally and leverage the statefulness of the kernel to avoid repeating code. +Import libraries in a separate code block. +Define a function or a class in a separate code block. +Run code that produces output in a separate code block. +Run code that involves expensive operations like download, upload, and call external APIs in a separate code block. + +When your code produces an output, the output will be returned to you. +Because you have limited conversation memory, if your code creates an image, +the output will be a path to the image instead of the image itself. +""" + + class UserCapability: + """(Experimental) An AgentCapability class that gives agent ability use a stateful + IPython code executor. This capability can be added to an agent using + the `add_to_agent` method which append a system message update to the + agent's system message.""" + + def __init__(self, system_message_update: str): + self._system_message_update = system_message_update + + def add_to_agent(self, agent: LLMAgent) -> None: + """Add this capability to an agent by appending a system message + update to the agent's system message. + + **Currently we do not check for conflicts with existing content in + the agent's system message.** + + Args: + agent (LLMAgent): The agent to add the capability to. + """ + agent.update_system_message(agent.system_message + self._system_message_update) + + def __init__( + self, + jupyter_server: Union[JupyterConnectable, JupyterConnectionInfo], + kernel_name: str = "python3", + timeout: int = 60, + output_dir: Union[Path, str] = Path("."), + system_message_update: str = DEFAULT_SYSTEM_MESSAGE_UPDATE, + ): + if timeout < 1: + raise ValueError("Timeout must be greater than or equal to 1.") + + if isinstance(output_dir, str): + output_dir = Path(output_dir) + + if not output_dir.exists(): + raise ValueError(f"Output directory {output_dir} does not exist.") + + if isinstance(jupyter_server, JupyterConnectable): + self._connection_info = jupyter_server.connection_info + elif isinstance(jupyter_server, JupyterConnectionInfo): + self._connection_info = jupyter_server + else: + raise ValueError("jupyter_server must be a JupyterConnectable or JupyterConnectionInfo.") + + self._jupyter_client = JupyterClient(self._connection_info) + available_kernels = self._jupyter_client.list_kernel_specs() + if kernel_name not in available_kernels["kernelspecs"]: + raise ValueError(f"Kernel {kernel_name} is not installed.") + + self._kernel_id = self._jupyter_client.start_kernel(kernel_name) + self._kernel_name = kernel_name + self._jupyter_kernel_client = self._jupyter_client.get_kernel_client(self._kernel_id) + self._timeout = timeout + self._output_dir = output_dir + self._system_message_update = system_message_update + + @property + def user_capability(self) -> "JupyterCodeExecutor.UserCapability": + """(Experimental) Export a user capability for this executor that can be added to + an agent using the `add_to_agent` method.""" + return JupyterCodeExecutor.UserCapability(self._system_message_update) + + @property + def code_extractor(self) -> CodeExtractor: + """(Experimental) Export a code extractor that can be used by an agent.""" + return MarkdownCodeExtractor() + + def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> IPythonCodeResult: + """(Experimental) Execute a list of code blocks and return the result. + + This method executes a list of code blocks as cells in an IPython kernel + managed by this class. + See: https://jupyter-client.readthedocs.io/en/stable/messaging.html + for the message protocol. + + Args: + code_blocks (List[CodeBlock]): A list of code blocks to execute. + + Returns: + IPythonCodeResult: The result of the code execution. + """ + self._jupyter_kernel_client.wait_for_ready() + outputs = [] + output_files = [] + for code_block in code_blocks: + code = self._process_code(code_block.code) + result = self._jupyter_kernel_client.execute(code, timeout_seconds=self._timeout) + if result.is_ok: + outputs.append(result.output) + for data in result.data_items: + if data.mime_type == "image/png": + path = self._save_image(data.data) + outputs.append(f"Image data saved to {path}") + output_files.append(path) + elif data.mime_type == "text/html": + path = self._save_html(data.data) + outputs.append(f"HTML data saved to {path}") + output_files.append(path) + else: + outputs.append(json.dumps(data.data)) + else: + return IPythonCodeResult( + exit_code=1, + output=f"ERROR: {result.output}", + ) + + return IPythonCodeResult( + exit_code=0, output="\n".join([str(output) for output in outputs]), output_files=output_files + ) + + def restart(self) -> None: + """(Experimental) Restart a new session.""" + self._jupyter_client.restart_kernel(self._kernel_id) + self._jupyter_kernel_client = self._jupyter_client.get_kernel_client(self._kernel_id) + + def _save_image(self, image_data_base64: str) -> str: + """Save image data to a file.""" + image_data = base64.b64decode(image_data_base64) + # Randomly generate a filename. + filename = f"{uuid.uuid4().hex}.png" + path = os.path.join(self._output_dir, filename) + with open(path, "wb") as f: + f.write(image_data) + return os.path.abspath(path) + + def _save_html(self, html_data: str) -> str: + """Save html data to a file.""" + # Randomly generate a filename. + filename = f"{uuid.uuid4().hex}.html" + path = os.path.join(self._output_dir, filename) + with open(path, "w") as f: + f.write(html_data) + return os.path.abspath(path) + + def _process_code(self, code: str) -> str: + """Process code before execution.""" + # Find lines that start with `! pip install` and make sure "-qqq" flag is added. + lines = code.split("\n") + for i, line in enumerate(lines): + # use regex to find lines that start with `! pip install` or `!pip install`. + match = re.search(r"^! ?pip install", line) + if match is not None: + if "-qqq" not in line: + lines[i] = line.replace(match.group(0), match.group(0) + " -qqq") + return "\n".join(lines) + + +class LocalJupyterCodeExecutor(JupyterCodeExecutor): + def __init__(self, **kwargs: Any): + """Creates a LocalJupyterServer and passes it to JupyterCodeExecutor, see JupyterCodeExecutor for args""" + jupyter_server = LocalJupyterServer() + super().__init__(jupyter_server=jupyter_server, **kwargs) diff --git a/setup.py b/setup.py index a2577271cc1d..768ad708cf42 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,11 @@ "graph": ["networkx", "matplotlib"], "websurfer": ["beautifulsoup4", "markdownify", "pdfminer.six", "pathvalidate"], "redis": ["redis"], + # Dependencies for EmbeddedIPythonExecutor, to be removed once upstream bug fixed + # https://github.com/jupyter-server/kernel_gateway/issues/398 "ipython": ["jupyter-client>=8.6.0", "ipykernel>=6.29.0"], + # Dependencies for LocalJupyterExecutor + "local-jupyter-exec": ["jupyter-kernel-gateway", "websocket-client", "requests", "ipykernel"], }, classifiers=[ "Programming Language :: Python :: 3", diff --git a/test/coding/test_embedded_ipython_code_executor.py b/test/coding/test_embedded_ipython_code_executor.py index a43f103d4e24..fcd423497aaa 100644 --- a/test/coding/test_embedded_ipython_code_executor.py +++ b/test/coding/test_embedded_ipython_code_executor.py @@ -1,4 +1,6 @@ import os +import sys +from pathlib import Path import tempfile from typing import Dict, Union import uuid @@ -11,50 +13,62 @@ try: from autogen.coding.embedded_ipython_code_executor import EmbeddedIPythonCodeExecutor + from autogen.coding.jupyter_code_executor import LocalJupyterCodeExecutor + + # Skip on windows due to kernelgateway bug https://github.com/jupyter-server/kernel_gateway/issues/398 + if sys.platform == "win32": + classes_to_test = [EmbeddedIPythonCodeExecutor] + else: + classes_to_test = [EmbeddedIPythonCodeExecutor, LocalJupyterCodeExecutor] skip = False skip_reason = "" except ImportError: skip = True - skip_reason = "Dependencies for EmbeddedIPythonCodeExecutor not installed." + skip_reason = "Dependencies for EmbeddedIPythonCodeExecutor or LocalJupyterCodeExecutor not installed." + classes_to_test = [] @pytest.mark.skipif(skip, reason=skip_reason) -def test_create() -> None: +@pytest.mark.parametrize("cls", classes_to_test) +def test_create(cls) -> None: config: Dict[str, Union[str, CodeExecutor]] = {"executor": "ipython-embedded"} executor = CodeExecutorFactory.create(config) assert isinstance(executor, EmbeddedIPythonCodeExecutor) - config = {"executor": EmbeddedIPythonCodeExecutor()} + config = {"executor": cls()} executor = CodeExecutorFactory.create(config) assert executor is config["executor"] @pytest.mark.skipif(skip, reason=skip_reason) -def test_init() -> None: - executor = EmbeddedIPythonCodeExecutor(timeout=10, kernel_name="python3", output_dir=".") - assert executor.timeout == 10 and executor.kernel_name == "python3" and executor.output_dir == "." +@pytest.mark.parametrize("cls", classes_to_test) +def test_init(cls) -> None: + executor = cls(timeout=10, kernel_name="python3", output_dir=".") + assert executor._timeout == 10 and executor._kernel_name == "python3" and executor._output_dir == Path(".") # Try invalid output directory. with pytest.raises(ValueError, match="Output directory .* does not exist."): - executor = EmbeddedIPythonCodeExecutor(timeout=111, kernel_name="python3", output_dir="/invalid/directory") + executor = cls(timeout=111, kernel_name="python3", output_dir="/invalid/directory") # Try invalid kernel name. with pytest.raises(ValueError, match="Kernel .* is not installed."): - executor = EmbeddedIPythonCodeExecutor(timeout=111, kernel_name="invalid_kernel_name", output_dir=".") + executor = cls(timeout=111, kernel_name="invalid_kernel_name", output_dir=".") @pytest.mark.skipif(skip, reason=skip_reason) -def test_execute_code_single_code_block() -> None: - executor = EmbeddedIPythonCodeExecutor() +@pytest.mark.parametrize("cls", classes_to_test) +def test_execute_code_single_code_block(cls) -> None: + executor = cls() code_blocks = [CodeBlock(code="import sys\nprint('hello world!')", language="python")] code_result = executor.execute_code_blocks(code_blocks) assert code_result.exit_code == 0 and "hello world!" in code_result.output @pytest.mark.skipif(skip, reason=skip_reason) -def test_execute_code_multiple_code_blocks() -> None: - executor = EmbeddedIPythonCodeExecutor() +@pytest.mark.parametrize("cls", classes_to_test) +def test_execute_code_multiple_code_blocks(cls) -> None: + executor = cls() code_blocks = [ CodeBlock(code="import sys\na = 123 + 123\n", language="python"), CodeBlock(code="print(a)", language="python"), @@ -75,8 +89,9 @@ def test_function(a, b): @pytest.mark.skipif(skip, reason=skip_reason) -def test_execute_code_bash_script() -> None: - executor = EmbeddedIPythonCodeExecutor() +@pytest.mark.parametrize("cls", classes_to_test) +def test_execute_code_bash_script(cls) -> None: + executor = cls() # Test bash script. code_blocks = [CodeBlock(code='!echo "hello world!"', language="bash")] code_result = executor.execute_code_blocks(code_blocks) @@ -84,16 +99,18 @@ def test_execute_code_bash_script() -> None: @pytest.mark.skipif(skip, reason=skip_reason) -def test_timeout() -> None: - executor = EmbeddedIPythonCodeExecutor(timeout=1) +@pytest.mark.parametrize("cls", classes_to_test) +def test_timeout(cls) -> None: + executor = cls(timeout=1) code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")] code_result = executor.execute_code_blocks(code_blocks) assert code_result.exit_code and "Timeout" in code_result.output @pytest.mark.skipif(skip, reason=skip_reason) -def test_silent_pip_install() -> None: - executor = EmbeddedIPythonCodeExecutor(timeout=600) +@pytest.mark.parametrize("cls", classes_to_test) +def test_silent_pip_install(cls) -> None: + executor = cls(timeout=600) code_blocks = [CodeBlock(code="!pip install matplotlib numpy", language="python")] code_result = executor.execute_code_blocks(code_blocks) assert code_result.exit_code == 0 and code_result.output.strip() == "" @@ -105,8 +122,9 @@ def test_silent_pip_install() -> None: @pytest.mark.skipif(skip, reason=skip_reason) -def test_restart() -> None: - executor = EmbeddedIPythonCodeExecutor() +@pytest.mark.parametrize("cls", classes_to_test) +def test_restart(cls) -> None: + executor = cls() code_blocks = [CodeBlock(code="x = 123", language="python")] code_result = executor.execute_code_blocks(code_blocks) assert code_result.exit_code == 0 and code_result.output.strip() == "" @@ -118,9 +136,10 @@ def test_restart() -> None: @pytest.mark.skipif(skip, reason=skip_reason) -def test_save_image() -> None: +@pytest.mark.parametrize("cls", classes_to_test) +def test_save_image(cls) -> None: with tempfile.TemporaryDirectory() as temp_dir: - executor = EmbeddedIPythonCodeExecutor(output_dir=temp_dir) + executor = cls(output_dir=temp_dir) # Install matplotlib. code_blocks = [CodeBlock(code="!pip install matplotlib", language="python")] code_result = executor.execute_code_blocks(code_blocks) @@ -137,9 +156,10 @@ def test_save_image() -> None: @pytest.mark.skipif(skip, reason=skip_reason) -def test_save_html() -> None: +@pytest.mark.parametrize("cls", classes_to_test) +def test_save_html(cls) -> None: with tempfile.TemporaryDirectory() as temp_dir: - executor = EmbeddedIPythonCodeExecutor(output_dir=temp_dir) + executor = cls(output_dir=temp_dir) # Test saving html. code_blocks = [ CodeBlock(code="from IPython.display import HTML\nHTML('

Hello, world!

')", language="python") @@ -152,7 +172,8 @@ def test_save_html() -> None: @pytest.mark.skipif(skip, reason=skip_reason) @pytest.mark.skipif(skip_openai, reason="openai not installed OR requested to skip") -def test_conversable_agent_capability() -> None: +@pytest.mark.parametrize("cls", classes_to_test) +def test_conversable_agent_capability(cls) -> None: KEY_LOC = "notebook" OAI_CONFIG_LIST = "OAI_CONFIG_LIST" config_list = config_list_from_json( @@ -171,7 +192,7 @@ def test_conversable_agent_capability() -> None: llm_config=llm_config, code_execution_config=False, ) - executor = EmbeddedIPythonCodeExecutor() + executor = cls() executor.user_capability.add_to_agent(agent) # Test updated system prompt. @@ -193,11 +214,12 @@ def test_conversable_agent_capability() -> None: @pytest.mark.skipif(skip, reason=skip_reason) -def test_conversable_agent_code_execution() -> None: +@pytest.mark.parametrize("cls", classes_to_test) +def test_conversable_agent_code_execution(cls) -> None: agent = ConversableAgent( "user_proxy", llm_config=False, - code_execution_config={"executor": "ipython-embedded"}, + code_execution_config={"executor": cls()}, ) msg = """ Run this code: