Skip to content

Commit

Permalink
Update local cli executor to use same filename strategy as docker (#1981
Browse files Browse the repository at this point in the history
)

* consistent file saving across cli executors

* test fixes

* feedback

* make path

* formatting

* run timeout test on windows

---------

Co-authored-by: Chi Wang <[email protected]>
  • Loading branch information
jackgerrits and sonichi authored Mar 12, 2024
1 parent a814ba5 commit d583c80
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 87 deletions.
11 changes: 10 additions & 1 deletion autogen/coding/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Protocol, Union, runtime_checkable
from typing import Any, Dict, List, Optional, Protocol, Union, runtime_checkable

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -77,3 +77,12 @@ class IPythonCodeResult(CodeResult):
default_factory=list,
description="The list of files that the executed code blocks generated.",
)


class CommandLineCodeResult(CodeResult):
"""(Experimental) A code result class for command line code executor."""

code_file: Optional[str] = Field(
default=None,
description="The file that the executed code block was saved to.",
)
31 changes: 11 additions & 20 deletions autogen/coding/docker_commandline_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from docker.models.containers import Container
from docker.errors import ImageNotFound

from .local_commandline_code_executor import CommandLineCodeResult
from .utils import _get_file_name_from_content
from .base import CommandLineCodeResult

from ..code_utils import TIMEOUT_MSG, _cmd
from .base import CodeBlock, CodeExecutor, CodeExtractor
Expand Down Expand Up @@ -168,25 +169,15 @@ def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CommandLineCodeRe
lang = code_block.language
code = code_block.code

code_hash = md5(code.encode()).hexdigest()

# Check if there is a filename comment
# Get first line
first_line = code.split("\n")[0]
if first_line.startswith("# filename:"):
filename = first_line.split(":")[1].strip()

# Handle relative paths in the filename
path = Path(filename)
if not path.is_absolute():
path = Path("/workspace") / path
path = path.resolve()
try:
path.relative_to(Path("/workspace"))
except ValueError:
return CommandLineCodeResult(exit_code=1, output="Filename is not in the workspace")
else:
# create a file with a automatically generated name
try:
# Check if there is a filename comment
filename = _get_file_name_from_content(code, Path("/workspace"))
except ValueError:
return CommandLineCodeResult(exit_code=1, output="Filename is not in the workspace")

if filename is None:
# create a file with an automatically generated name
code_hash = md5(code.encode()).hexdigest()
filename = f"tmp_code_{code_hash}.{'py' if lang.startswith('python') else lang}"

code_path = self._work_dir / filename
Expand Down
97 changes: 55 additions & 42 deletions autogen/coding/local_commandline_code_executor.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
from hashlib import md5
import os
from pathlib import Path
import re
import sys
import uuid
import warnings
from typing import ClassVar, List, Optional, Union
from pydantic import Field
from typing import ClassVar, List, Union

from ..agentchat.agent import LLMAgent
from ..code_utils import execute_code
from .base import CodeBlock, CodeExecutor, CodeExtractor, CodeResult
from ..code_utils import TIMEOUT_MSG, WIN32, _cmd, execute_code
from .base import CodeBlock, CodeExecutor, CodeExtractor, CommandLineCodeResult
from .markdown_code_extractor import MarkdownCodeExtractor

__all__ = (
"LocalCommandLineCodeExecutor",
"CommandLineCodeResult",
)
from .utils import _get_file_name_from_content

import subprocess

class CommandLineCodeResult(CodeResult):
"""(Experimental) A code result class for command line code executor."""

code_file: Optional[str] = Field(
default=None,
description="The file that the executed code block was saved to.",
)
__all__ = ("LocalCommandLineCodeExecutor",)


class LocalCommandLineCodeExecutor(CodeExecutor):
SUPPORTED_LANGUAGES: ClassVar[List[str]] = ["bash", "shell", "sh", "pwsh", "powershell", "ps1", "python"]

def __init__(
self,
timeout: int = 60,
Expand Down Expand Up @@ -113,41 +108,59 @@ def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CommandLineCodeRe
Returns:
CommandLineCodeResult: The result of the code execution."""
logs_all = ""
file_names = []
for code_block in code_blocks:
lang, code = code_block.language, code_block.code
lang = lang.lower()

LocalCommandLineCodeExecutor.sanitize_command(lang, code)
filename_uuid = uuid.uuid4().hex
filename = None
if lang in ["bash", "shell", "sh", "pwsh", "powershell", "ps1"]:
filename = f"{filename_uuid}.{lang}"
exitcode, logs, _ = execute_code(
code=code,
lang=lang,
timeout=self._timeout,
work_dir=str(self._work_dir),
filename=filename,
use_docker=False,
)
elif lang in ["python", "Python"]:
filename = f"{filename_uuid}.py"
exitcode, logs, _ = execute_code(
code=code,
lang="python",
timeout=self._timeout,
work_dir=str(self._work_dir),
filename=filename,
use_docker=False,
)
else:

if WIN32 and lang in ["sh", "shell"]:
lang = "ps1"

if lang not in self.SUPPORTED_LANGUAGES:
# In case the language is not supported, we return an error message.
exitcode, logs, _ = (1, f"unknown language {lang}", None)
logs_all += "\n" + logs
exitcode = 1
logs_all += "\n" + f"unknown language {lang}"
break

try:
# Check if there is a filename comment
filename = _get_file_name_from_content(code, self._work_dir)
except ValueError:
return CommandLineCodeResult(exit_code=1, output="Filename is not in the workspace")

if filename is None:
# create a file with an automatically generated name
code_hash = md5(code.encode()).hexdigest()
filename = f"tmp_code_{code_hash}.{'py' if lang.startswith('python') else lang}"

written_file = (self._work_dir / filename).resolve()
written_file.open("w", encoding="utf-8").write(code)
file_names.append(written_file)

program = sys.executable if lang.startswith("python") else _cmd(lang)
cmd = [program, str(written_file.absolute())]

try:
result = subprocess.run(
cmd, cwd=self._work_dir, capture_output=True, text=True, timeout=float(self._timeout)
)
except subprocess.TimeoutExpired:
logs_all += "\n" + TIMEOUT_MSG
# Same exit code as the timeout command on linux.
exitcode = 124
break

logs_all += result.stderr
logs_all += result.stdout
exitcode = result.returncode

if exitcode != 0:
break

code_filename = str(self._work_dir / filename) if filename is not None else None
return CommandLineCodeResult(exit_code=exitcode, output=logs_all, code_file=code_filename)
code_file = str(file_names[0]) if len(file_names) > 0 else None
return CommandLineCodeResult(exit_code=exitcode, output=logs_all, code_file=code_file)

def restart(self) -> None:
"""(Experimental) Restart the code executor."""
Expand Down
22 changes: 22 additions & 0 deletions autogen/coding/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Will return the filename relative to the workspace path
from pathlib import Path
from typing import Optional


# Raises ValueError if the file is not in the workspace
def _get_file_name_from_content(code: str, workspace_path: Path) -> Optional[str]:
first_line = code.split("\n")[0]
# TODO - support other languages
if first_line.startswith("# filename:"):
filename = first_line.split(":")[1].strip()

# Handle relative paths in the filename
path = Path(filename)
if not path.is_absolute():
path = workspace_path / path
path = path.resolve()
# Throws an error if the file is not in the workspace
relative = path.relative_to(workspace_path.resolve())
return str(relative)

return None
40 changes: 16 additions & 24 deletions test/coding/test_commandline_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def _test_execute_code(executor: CodeExecutor) -> None:
assert file_line.strip() == code_line.strip()


@pytest.mark.skipif(sys.platform in ["win32"], reason="do not run on windows")
@pytest.mark.parametrize("cls", classes_to_test)
def test_commandline_code_executor_timeout(cls) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
Expand Down Expand Up @@ -193,36 +192,29 @@ def test_dangerous_commands(lang, code, expected_message):
), f"Expected message '{expected_message}' not found in '{str(exc_info.value)}'"


# This is kind of hard to test because each exec is a new env
@pytest.mark.skipif(
skip_docker or not is_docker_running(),
reason="docker is not running or requested to skip docker tests",
)
def test_docker_invalid_relative_path() -> None:
with DockerCommandLineCodeExecutor() as executor:
code = """# filename: /tmp/test.py
@pytest.mark.parametrize("cls", classes_to_test)
def test_invalid_relative_path(cls) -> None:
executor = cls()
code = """# filename: /tmp/test.py
print("hello world")
"""
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
assert result.exit_code == 1 and "Filename is not in the workspace" in result.output
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
assert result.exit_code == 1 and "Filename is not in the workspace" in result.output


@pytest.mark.skipif(
skip_docker or not is_docker_running(),
reason="docker is not running or requested to skip docker tests",
)
def test_docker_valid_relative_path() -> None:
@pytest.mark.parametrize("cls", classes_to_test)
def test_valid_relative_path(cls) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = Path(temp_dir)
with DockerCommandLineCodeExecutor(work_dir=temp_dir) as executor:
code = """# filename: test.py
executor = cls(work_dir=temp_dir)
code = """# filename: test.py
print("hello world")
"""
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
assert result.exit_code == 0
assert "hello world" in result.output
assert "test.py" in result.code_file
assert (temp_dir / "test.py") == Path(result.code_file)
assert (temp_dir / "test.py").exists()
result = executor.execute_code_blocks([CodeBlock(code=code, language="python")])
assert result.exit_code == 0
assert "hello world" in result.output
assert "test.py" in result.code_file
assert (temp_dir / "test.py").resolve() == Path(result.code_file).resolve()
assert (temp_dir / "test.py").exists()

0 comments on commit d583c80

Please sign in to comment.