Skip to content

Commit 0d6d074

Browse files
jackgerritsekzhu
authored andcommitted
Use jupyer-kernel-gateway for ipython executor (#1748)
* checkpoint async based * Implement jupyter client and use jupyer gateway * update deps * address comments * add missing parenthesis * Update build.yml * CI fixes * change requirement name * debug * print stderr * dont seek * show token * mitigaton for windows bug * use hex token to avoid - in token * formatting * put back in place original while the windows bug exists * lint * Update autogen/coding/jupyter_code_executor.py * Update jupyter_code_executor.py * Update test_embedded_ipython_code_executor.py * Update setup.py * Update build.yml * fix nameerror --------- Co-authored-by: Eric Zhu <[email protected]>
1 parent aff1459 commit 0d6d074

11 files changed

+673
-39
lines changed

.github/workflows/build.yml

+5
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,13 @@ jobs:
4242
pip install -e .
4343
python -c "import autogen"
4444
pip install pytest mock
45+
- name: Install optional dependencies for code executors
46+
# code executors auto skip without deps, so only run for python 3.11
47+
if: matrix.python-version == '3.11'
48+
run: |
4549
pip install jupyter-client ipykernel
4650
python -m ipykernel install --user --name python3
51+
pip install -e ".[local-jupyter-exec]"
4752
- name: Set AUTOGEN_USE_DOCKER based on OS
4853
shell: bash
4954
run: |

autogen/coding/base.py

+9
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,12 @@ def restart(self) -> None:
9292
This method is called when the agent is reset.
9393
"""
9494
... # pragma: no cover
95+
96+
97+
class IPythonCodeResult(CodeResult):
98+
"""(Experimental) A code result class for IPython code executor."""
99+
100+
output_files: List[str] = Field(
101+
default_factory=list,
102+
description="The list of files that the executed code blocks generated.",
103+
)

autogen/coding/embedded_ipython_code_executor.py

+5-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import json
33
import os
4+
from pathlib import Path
45
import re
56
import uuid
67
from queue import Empty
@@ -11,19 +12,10 @@
1112
from pydantic import BaseModel, Field, field_validator
1213

1314
from ..agentchat.agent import LLMAgent
14-
from .base import CodeBlock, CodeExtractor, CodeResult
15+
from .base import CodeBlock, CodeExtractor, IPythonCodeResult
1516
from .markdown_code_extractor import MarkdownCodeExtractor
1617

17-
__all__ = ("EmbeddedIPythonCodeExecutor", "IPythonCodeResult")
18-
19-
20-
class IPythonCodeResult(CodeResult):
21-
"""(Experimental) A code result class for IPython code executor."""
22-
23-
output_files: List[str] = Field(
24-
default_factory=list,
25-
description="The list of files that the executed code blocks generated.",
26-
)
18+
__all__ = "EmbeddedIPythonCodeExecutor"
2719

2820

2921
class EmbeddedIPythonCodeExecutor(BaseModel):
@@ -126,6 +118,8 @@ def __init__(self, **kwargs: Any):
126118
self._kernel_client = self._kernel_manager.client()
127119
self._kernel_client.start_channels()
128120
self._timeout = self.timeout
121+
self._kernel_name = self.kernel_name
122+
self._output_dir = Path(self.output_dir)
129123

130124
@property
131125
def user_capability(self) -> "EmbeddedIPythonCodeExecutor.UserCapability":

autogen/coding/factory.py

+4
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,9 @@ def create(code_execution_config: Dict[str, Any]) -> CodeExecutor:
3737
from .local_commandline_code_executor import LocalCommandlineCodeExecutor
3838

3939
return LocalCommandlineCodeExecutor(**code_execution_config.get("commandline-local", {}))
40+
elif executor == "jupyter-local":
41+
from .jupyter_code_executor import LocalJupyterCodeExecutor
42+
43+
return LocalJupyterCodeExecutor(**code_execution_config.get("jupyter-local", {}))
4044
else:
4145
raise ValueError(f"Unknown code executor {executor}")

autogen/coding/jupyter/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .base import JupyterConnectable, JupyterConnectionInfo
2+
from .jupyter_client import JupyterClient
3+
from .local_jupyter_server import LocalJupyterServer
4+
5+
__all__ = ["JupyterConnectable", "JupyterConnectionInfo", "JupyterClient", "LocalJupyterServer"]

autogen/coding/jupyter/base.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dataclasses import dataclass
2+
from typing import Optional, Protocol, runtime_checkable
3+
4+
5+
@dataclass
6+
class JupyterConnectionInfo:
7+
"""(Experimental)"""
8+
9+
host: str
10+
use_https: bool
11+
port: int
12+
token: Optional[str]
13+
14+
15+
@runtime_checkable
16+
class JupyterConnectable(Protocol):
17+
"""(Experimental)"""
18+
19+
@property
20+
def connection_info(self) -> JupyterConnectionInfo:
21+
pass
+200
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
from types import TracebackType
5+
from typing import Any, Dict, List, Optional, cast
6+
import sys
7+
8+
if sys.version_info >= (3, 11):
9+
from typing import Self
10+
else:
11+
from typing_extensions import Self
12+
13+
import json
14+
import uuid
15+
import datetime
16+
import requests
17+
18+
import websocket
19+
from websocket import WebSocket
20+
21+
from .base import JupyterConnectionInfo
22+
23+
24+
class JupyterClient:
25+
"""(Experimental) A client for communicating with a Jupyter gateway server."""
26+
27+
def __init__(self, connection_info: JupyterConnectionInfo):
28+
self._connection_info = connection_info
29+
30+
def _get_headers(self) -> Dict[str, str]:
31+
if self._connection_info.token is None:
32+
return {}
33+
return {"Authorization": f"token {self._connection_info.token}"}
34+
35+
def _get_api_base_url(self) -> str:
36+
protocol = "https" if self._connection_info.use_https else "http"
37+
return f"{protocol}://{self._connection_info.host}:{self._connection_info.port}"
38+
39+
def _get_ws_base_url(self) -> str:
40+
return f"ws://{self._connection_info.host}:{self._connection_info.port}"
41+
42+
def list_kernel_specs(self) -> Dict[str, Dict[str, str]]:
43+
response = requests.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers())
44+
return cast(Dict[str, Dict[str, str]], response.json())
45+
46+
def list_kernels(self) -> List[Dict[str, str]]:
47+
response = requests.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers())
48+
return cast(List[Dict[str, str]], response.json())
49+
50+
def start_kernel(self, kernel_spec_name: str) -> str:
51+
"""Start a new kernel.
52+
53+
Args:
54+
kernel_spec_name (str): Name of the kernel spec to start
55+
56+
Returns:
57+
str: ID of the started kernel
58+
"""
59+
60+
response = requests.post(
61+
f"{self._get_api_base_url()}/api/kernels",
62+
headers=self._get_headers(),
63+
json={"name": kernel_spec_name},
64+
)
65+
return cast(str, response.json()["id"])
66+
67+
def restart_kernel(self, kernel_id: str) -> None:
68+
response = requests.post(
69+
f"{self._get_api_base_url()}/api/kernels/{kernel_id}/restart", headers=self._get_headers()
70+
)
71+
response.raise_for_status()
72+
73+
def get_kernel_client(self, kernel_id: str) -> JupyterKernelClient:
74+
ws_url = f"{self._get_ws_base_url()}/api/kernels/{kernel_id}/channels"
75+
ws = websocket.create_connection(ws_url, header=self._get_headers())
76+
return JupyterKernelClient(ws)
77+
78+
79+
class JupyterKernelClient:
80+
"""(Experimental) A client for communicating with a Jupyter kernel."""
81+
82+
@dataclass
83+
class ExecutionResult:
84+
@dataclass
85+
class DataItem:
86+
mime_type: str
87+
data: str
88+
89+
is_ok: bool
90+
output: str
91+
data_items: List[DataItem]
92+
93+
def __init__(self, websocket: WebSocket):
94+
self._session_id: str = uuid.uuid4().hex
95+
self._websocket: WebSocket = websocket
96+
97+
def __enter__(self) -> Self:
98+
return self
99+
100+
def __exit__(
101+
self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
102+
) -> None:
103+
self._websocket.close()
104+
105+
def _send_message(self, *, content: Dict[str, Any], channel: str, message_type: str) -> str:
106+
timestamp = datetime.datetime.now().isoformat()
107+
message_id = uuid.uuid4().hex
108+
message = {
109+
"header": {
110+
"username": "autogen",
111+
"version": "5.0",
112+
"session": self._session_id,
113+
"msg_id": message_id,
114+
"msg_type": message_type,
115+
"date": timestamp,
116+
},
117+
"parent_header": {},
118+
"channel": channel,
119+
"content": content,
120+
"metadata": {},
121+
"buffers": {},
122+
}
123+
self._websocket.send_text(json.dumps(message))
124+
return message_id
125+
126+
def _receive_message(self, timeout_seconds: Optional[float]) -> Optional[Dict[str, Any]]:
127+
self._websocket.settimeout(timeout_seconds)
128+
try:
129+
data = self._websocket.recv()
130+
if isinstance(data, bytes):
131+
data = data.decode("utf-8")
132+
return cast(Dict[str, Any], json.loads(data))
133+
except websocket.WebSocketTimeoutException:
134+
return None
135+
136+
def wait_for_ready(self, timeout_seconds: Optional[float] = None) -> bool:
137+
message_id = self._send_message(content={}, channel="shell", message_type="kernel_info_request")
138+
while True:
139+
message = self._receive_message(timeout_seconds)
140+
# This means we timed out with no new messages.
141+
if message is None:
142+
return False
143+
if (
144+
message.get("parent_header", {}).get("msg_id") == message_id
145+
and message["msg_type"] == "kernel_info_reply"
146+
):
147+
return True
148+
149+
def execute(self, code: str, timeout_seconds: Optional[float] = None) -> ExecutionResult:
150+
message_id = self._send_message(
151+
content={
152+
"code": code,
153+
"silent": False,
154+
"store_history": True,
155+
"user_expressions": {},
156+
"allow_stdin": False,
157+
"stop_on_error": True,
158+
},
159+
channel="shell",
160+
message_type="execute_request",
161+
)
162+
163+
text_output = []
164+
data_output = []
165+
while True:
166+
message = self._receive_message(timeout_seconds)
167+
if message is None:
168+
return JupyterKernelClient.ExecutionResult(
169+
is_ok=False, output="ERROR: Timeout waiting for output from code block.", data_items=[]
170+
)
171+
172+
# Ignore messages that are not for this execution.
173+
if message.get("parent_header", {}).get("msg_id") != message_id:
174+
continue
175+
176+
msg_type = message["msg_type"]
177+
content = message["content"]
178+
if msg_type in ["execute_result", "display_data"]:
179+
for data_type, data in content["data"].items():
180+
if data_type == "text/plain":
181+
text_output.append(data)
182+
elif data_type.startswith("image/") or data_type == "text/html":
183+
data_output.append(self.ExecutionResult.DataItem(mime_type=data_type, data=data))
184+
else:
185+
text_output.append(json.dumps(data))
186+
elif msg_type == "stream":
187+
text_output.append(content["text"])
188+
elif msg_type == "error":
189+
# Output is an error.
190+
return JupyterKernelClient.ExecutionResult(
191+
is_ok=False,
192+
output=f"ERROR: {content['ename']}: {content['evalue']}\n{content['traceback']}",
193+
data_items=[],
194+
)
195+
if msg_type == "status" and content["execution_state"] == "idle":
196+
break
197+
198+
return JupyterKernelClient.ExecutionResult(
199+
is_ok=True, output="\n".join([str(output) for output in text_output]), data_items=data_output
200+
)

0 commit comments

Comments
 (0)