Skip to content

Commit bc26ec3

Browse files
authored
Azure container code execution (#346)
* Azure container code execution * fix check errors. Remove unnecessary file * add TokenProvider class * update deps * addressing PR comments * update docstring * switch to aiohttp * fix client timeout
1 parent 698a8f3 commit bc26ec3

File tree

7 files changed

+608
-26
lines changed

7 files changed

+608
-26
lines changed

python/pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ dependencies = [
2222
"types-aiofiles",
2323
"grpcio",
2424
"protobuf",
25-
"tiktoken"
25+
"tiktoken",
26+
"azure-core"
2627
]
2728

2829
[tool.hatch.envs.default]

python/src/agnext/components/code_executor/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from ._base import CodeBlock, CodeExecutor, CodeResult
22
from ._func_with_reqs import Alias, FunctionWithRequirements, Import, ImportFromModule, with_requirements
3+
from ._impl.azure_container_code_executor import AzureContainerCodeExecutor
34
from ._impl.command_line_code_result import CommandLineCodeResult
45
from ._impl.local_commandline_code_executor import LocalCommandLineCodeExecutor
56

67
__all__ = [
8+
"AzureContainerCodeExecutor",
79
"LocalCommandLineCodeExecutor",
810
"CommandLineCodeResult",
911
"CodeBlock",

python/src/agnext/components/code_executor/_base.py

+5
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ async def execute_code_blocks(
4141
4242
Returns:
4343
CodeResult: The result of the code execution.
44+
45+
Raises:
46+
ValueError: Errors in user inputs
47+
asyncio.TimeoutError: Code execution timeouts
48+
asyncio.CancelledError: CancellationToken evoked during execution
4449
"""
4550
...
4651

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
# File based from: https://github.com/microsoft/autogen/blob/main/autogen/coding/local_commandline_code_executor.py
2+
# Credit to original authors
3+
4+
import asyncio
5+
from string import Template
6+
from typing import Any, Callable, ClassVar, List, Optional, Protocol, Sequence, Union
7+
from uuid import UUID, uuid4
8+
9+
import aiohttp
10+
from azure.core.credentials import AccessToken
11+
12+
# from azure.mgmt.appcontainers import ContainerAppsAPIClient
13+
from typing_extensions import ParamSpec
14+
15+
from ....core import CancellationToken
16+
from .._base import CodeBlock, CodeExecutor, CodeResult
17+
from .._func_with_reqs import (
18+
FunctionWithRequirements,
19+
FunctionWithRequirementsStr,
20+
build_python_functions_file,
21+
to_stub,
22+
)
23+
from .utils import PYTHON_VARIANTS, get_required_packages, lang_to_cmd # type: ignore
24+
25+
__all__ = ("AzureContainerCodeExecutor", "TokenProvider")
26+
27+
A = ParamSpec("A")
28+
29+
30+
class TokenProvider(Protocol):
31+
def get_token(
32+
self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any
33+
) -> AccessToken: ...
34+
35+
36+
class AzureContainerCodeExecutor(CodeExecutor):
37+
SUPPORTED_LANGUAGES: ClassVar[List[str]] = [
38+
"python",
39+
]
40+
FUNCTION_PROMPT_TEMPLATE: ClassVar[str] = """You have access to the following user defined functions.
41+
42+
$functions"""
43+
44+
def __init__(
45+
self,
46+
pool_management_endpoint: str,
47+
credential: TokenProvider,
48+
timeout: int = 60,
49+
functions: Sequence[
50+
Union[
51+
FunctionWithRequirements[Any, A],
52+
Callable[..., Any],
53+
FunctionWithRequirementsStr,
54+
]
55+
] = [],
56+
functions_module: str = "functions",
57+
persist_session: bool = False,
58+
):
59+
"""(Experimental) A code executor class that executes code through a an Azure
60+
Container Apps instance.
61+
62+
**This will execute LLM generated code on an Azure dynamic code container.**
63+
64+
The execution environment is similar to that of a jupyter notebook which allows for incremental code execution. The parameter functions are executed in order once at the beginning of each session. Each code block is then executed serially and in the order they are received. Each environment has a statically defined set of available packages which cannot be changed.
65+
Currently, attempting to use packages beyond what is available on the environment will result in an error. To get the list of supported packages, call the `get_available_packages` function.
66+
Currently the only supported language is Python.
67+
For Python code, use the language "python" for the code block.
68+
69+
Args:
70+
pool_management_endpoint (str): The azure container apps dynamic sessions endpoint.
71+
credential (TokenProvider): An object that implements the get_token function.
72+
timeout (int): The timeout for the execution of any single code block. Default is 60.
73+
functions (List[Union[FunctionWithRequirements[Any, A], Callable[..., Any]]]): A list of functions that are available to the code executor. Default is an empty list.
74+
persist_session (bool): True - reuse the same azure session ID until restart() is called. False - Refresh the azure session ID for every call to execute_code(). Default is False.
75+
"""
76+
77+
if timeout < 1:
78+
raise ValueError("Timeout must be greater than or equal to 1.")
79+
80+
if not functions_module.isidentifier():
81+
raise ValueError("Module name must be a valid Python identifier")
82+
83+
self._functions_module = functions_module
84+
85+
self._timeout = timeout
86+
87+
self._functions = functions
88+
self._func_code: str | None = None
89+
# Setup could take some time so we intentionally wait for the first code block to do it.
90+
if len(functions) > 0:
91+
self._setup_functions_complete = False
92+
else:
93+
self._setup_functions_complete = True
94+
95+
self._pool_management_endpoint = pool_management_endpoint
96+
self._access_token: str | None = None
97+
self._persist_session = persist_session
98+
self._uuid: UUID = uuid4()
99+
self._available_packages: set[str] | None = None
100+
self._credential: TokenProvider = credential
101+
102+
# TODO: expiration?
103+
def _ensure_access_token(self) -> None:
104+
if not self._access_token:
105+
scope = "https://dynamicsessions.io"
106+
self._access_token = self._credential.get_token(scope).token
107+
108+
def format_functions_for_prompt(self, prompt_template: str = FUNCTION_PROMPT_TEMPLATE) -> str:
109+
"""(Experimental) Format the functions for a prompt.
110+
111+
The template includes one variable:
112+
- `$functions`: The functions formatted as stubs with two newlines between each function.
113+
114+
Args:
115+
prompt_template (str): The prompt template. Default is the class default.
116+
117+
Returns:
118+
str: The formatted prompt.
119+
"""
120+
121+
template = Template(prompt_template)
122+
return template.substitute(
123+
functions="\n\n".join([to_stub(func) for func in self._functions]),
124+
)
125+
126+
@property
127+
def functions_module(self) -> str:
128+
"""(Experimental) The module name for the functions."""
129+
return self._functions_module
130+
131+
@property
132+
def functions(self) -> List[str]:
133+
raise NotImplementedError
134+
135+
@property
136+
def timeout(self) -> int:
137+
"""(Experimental) The timeout for code execution."""
138+
return self._timeout
139+
140+
async def get_available_packages(self, cancellation_token: CancellationToken) -> set[str]:
141+
if self._available_packages is not None:
142+
return self._available_packages
143+
avail_pkgs = """
144+
import pkg_resources\n[d.project_name for d in pkg_resources.working_set]
145+
"""
146+
ret = await self._execute_code_dont_check_setup(
147+
[CodeBlock(code=avail_pkgs, language="python")], cancellation_token
148+
)
149+
if ret.exit_code != 0:
150+
raise ValueError(f"Failed to get list of available packages: {ret.output.strip()}")
151+
pkgs = ret.output.strip("[]")
152+
pkglist = pkgs.split(",\n")
153+
return {pkg.strip(" '") for pkg in pkglist}
154+
155+
async def _populate_available_packages(self, cancellation_token: CancellationToken) -> None:
156+
self._available_packages = await self.get_available_packages(cancellation_token)
157+
158+
async def _setup_functions(self, cancellation_token: CancellationToken) -> None:
159+
if not self._func_code:
160+
self._func_code = build_python_functions_file(self._functions)
161+
162+
# Check required function imports and packages
163+
lists_of_packages = [x.python_packages for x in self._functions if isinstance(x, FunctionWithRequirements)]
164+
# Should we also be checking the imports?
165+
166+
flattened_packages = [item for sublist in lists_of_packages for item in sublist]
167+
required_packages = set(flattened_packages)
168+
if self._available_packages is not None:
169+
missing_pkgs = set(required_packages - self._available_packages)
170+
if len(missing_pkgs) > 0:
171+
raise ValueError(f"Packages unavailable in environment: {missing_pkgs}")
172+
173+
# Attempt to load the function file to check for syntax errors, imports etc.
174+
exec_result = await self._execute_code_dont_check_setup(
175+
[CodeBlock(code=self._func_code, language="python")], cancellation_token
176+
)
177+
178+
if exec_result.exit_code != 0:
179+
raise ValueError(f"Functions failed to load: {exec_result.output.strip()}")
180+
181+
self._setup_functions_complete = True
182+
183+
async def execute_code_blocks(
184+
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
185+
) -> CodeResult:
186+
"""(Experimental) Execute the code blocks and return the result.
187+
188+
Args:
189+
code_blocks (List[CodeBlock]): The code blocks to execute.
190+
cancellation_token (CancellationToken): a token to cancel the operation
191+
192+
Returns:
193+
CodeResult: The result of the code execution."""
194+
if not self._persist_session:
195+
self.restart()
196+
if self._available_packages is None:
197+
await self._populate_available_packages(cancellation_token)
198+
if not self._setup_functions_complete:
199+
await self._setup_functions(cancellation_token)
200+
201+
return await self._execute_code_dont_check_setup(code_blocks, cancellation_token)
202+
203+
# The http call here should be replaced by an actual Azure client call once its available
204+
async def _execute_code_dont_check_setup(
205+
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
206+
) -> CodeResult:
207+
logs_all = ""
208+
exitcode = 0
209+
self._ensure_access_token()
210+
211+
# TODO: Better to use the client auth system rather than headers
212+
headers = {"Authorization": f"Bearer {self._access_token}"}
213+
properties = {
214+
"identifier": str(self._uuid),
215+
"codeInputType": "inline",
216+
"executionType": "synchronous",
217+
"pythonCode": "",
218+
"timeoutInSeconds": self._timeout,
219+
}
220+
timeout = aiohttp.ClientTimeout(total=float(self._timeout))
221+
async with aiohttp.ClientSession(timeout=timeout) as client:
222+
for code_block in code_blocks:
223+
lang, code = code_block.language, code_block.code
224+
lang = lang.lower()
225+
226+
if lang in PYTHON_VARIANTS:
227+
lang = "python"
228+
229+
if lang not in self.SUPPORTED_LANGUAGES:
230+
# In case the language is not supported, we return an error message.
231+
exitcode = 1
232+
logs_all += "\n" + f"unknown language {lang}"
233+
break
234+
235+
if self._available_packages is not None:
236+
req_pkgs = get_required_packages(code, lang)
237+
missing_pkgs = set(req_pkgs - self._available_packages)
238+
if len(missing_pkgs) > 0:
239+
# In case the code requires packages that are not available in the environment
240+
exitcode = 1
241+
logs_all += "\n" + f"Python packages unavailable in environment: {missing_pkgs}"
242+
break
243+
244+
properties["pythonCode"] = code_block.code
245+
246+
task = asyncio.create_task(
247+
client.post(
248+
self._pool_management_endpoint + "/python/execute",
249+
headers=headers,
250+
json={"properties": properties},
251+
)
252+
)
253+
254+
cancellation_token.link_future(task)
255+
try:
256+
response = await asyncio.wait_for(task, self._timeout)
257+
response.raise_for_status()
258+
data = await response.json()
259+
logs_all += data.get("stderr", "") + data.get("stdout", "")
260+
261+
if "Success" in data["status"]:
262+
logs_all += str(data["result"])
263+
elif "Failure" in data["status"]:
264+
exitcode = 1
265+
# This case is in the official code example https://github.com/Azure-Samples/container-apps-dynamic-sessions-samples/blob/dd2b3827bc8ea489b8f088654847239e2d51743f/autogen-python-webapi/aca_sessions_executor.py
266+
# I have not seen this case actually occur before
267+
if "error" in data:
268+
logs_all += f"\n{data['error']}"
269+
exitcode = 1
270+
271+
except asyncio.TimeoutError as e:
272+
logs_all += "\n Timeout"
273+
# e.add_note is only in py 3.11+
274+
raise asyncio.TimeoutError(logs_all) from e
275+
except asyncio.CancelledError as e:
276+
logs_all += "\n Cancelled"
277+
# e.add_note is only in py 3.11+
278+
raise asyncio.CancelledError(logs_all) from e
279+
except aiohttp.ClientResponseError as e:
280+
logs_all += "\nError while sending code block to endpoint"
281+
raise ConnectionError(logs_all) from e
282+
283+
return CodeResult(exit_code=exitcode, output=logs_all)
284+
285+
def restart(self) -> None:
286+
"""(Experimental) Restart the code executor."""
287+
self._uuid = uuid4()
288+
self._setup_functions_complete = False
289+
self._access_token = None
290+
self._available_packages = None

python/src/agnext/components/code_executor/_impl/utils.py

+18
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,24 @@ def silence_pip(code: str, lang: str) -> str:
4646
return "\n".join(lines)
4747

4848

49+
def get_required_packages(code: str, lang: str) -> set[str]:
50+
ret: set[str] = set()
51+
if lang == "python":
52+
regex = r"^! ?pip install(.*)$"
53+
else:
54+
return ret
55+
56+
# Find lines that start with pip install and make sure "-qqq" flag is added.
57+
lines = code.split("\n")
58+
for _, line in enumerate(lines):
59+
# use regex to find lines that start with pip install.
60+
match = re.search(regex, line)
61+
if match is not None:
62+
reqs = match.group(1).split(",")
63+
ret = {req.strip(" ") for req in reqs}
64+
return ret
65+
66+
4967
PYTHON_VARIANTS = ["python", "Python", "py"]
5068

5169

0 commit comments

Comments
 (0)