Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use jupyer-kernel-gateway for ipython executor #1748

Merged
merged 25 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
pip install pytest mock
pip install jupyter-client ipykernel
python -m ipykernel install --user --name python3
pip install -e ".[local-jupyter-exec]"
sonichi marked this conversation as resolved.
Show resolved Hide resolved
- name: Set AUTOGEN_USE_DOCKER based on OS
shell: bash
run: |
Expand Down
9 changes: 9 additions & 0 deletions autogen/coding/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,12 @@ def restart(self) -> None:
This method is called when the agent is reset.
"""
... # pragma: no cover


class IPythonCodeResult(CodeResult):
"""(Experimental) A code result class for IPython code executor."""

output_files: List[str] = Field(
default_factory=list,
description="The list of files that the executed code blocks generated.",
)
16 changes: 5 additions & 11 deletions autogen/coding/embedded_ipython_code_executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import json
import os
from pathlib import Path
import re
import uuid
from queue import Empty
Expand All @@ -11,19 +12,10 @@
from pydantic import BaseModel, Field, field_validator

from ..agentchat.agent import LLMAgent
from .base import CodeBlock, CodeExtractor, CodeResult
from .base import CodeBlock, CodeExtractor, IPythonCodeResult
from .markdown_code_extractor import MarkdownCodeExtractor

__all__ = ("EmbeddedIPythonCodeExecutor", "IPythonCodeResult")


class IPythonCodeResult(CodeResult):
"""(Experimental) A code result class for IPython code executor."""

output_files: List[str] = Field(
default_factory=list,
description="The list of files that the executed code blocks generated.",
)
__all__ = "EmbeddedIPythonCodeExecutor"


class EmbeddedIPythonCodeExecutor(BaseModel):
Expand Down Expand Up @@ -126,6 +118,8 @@ def __init__(self, **kwargs: Any):
self._kernel_client = self._kernel_manager.client()
self._kernel_client.start_channels()
self._timeout = self.timeout
self._kernel_name = self.kernel_name
self._output_dir = Path(self.output_dir)

@property
def user_capability(self) -> "EmbeddedIPythonCodeExecutor.UserCapability":
Expand Down
4 changes: 4 additions & 0 deletions autogen/coding/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ def create(code_execution_config: Dict[str, Any]) -> CodeExecutor:
from .local_commandline_code_executor import LocalCommandlineCodeExecutor

return LocalCommandlineCodeExecutor(**code_execution_config.get("commandline-local", {}))
elif executor == "jupyter-local":
from .jupyter_code_executor import LocalJupyterCodeExecutor

return LocalJupyterCodeExecutor(**code_execution_config.get("jupyter-local", {}))
else:
raise ValueError(f"Unknown code executor {executor}")
5 changes: 5 additions & 0 deletions autogen/coding/jupyter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .base import JupyterConnectable, JupyterConnectionInfo
from .jupyter_client import JupyterClient
from .local_jupyter_server import LocalJupyterServer

__all__ = ["JupyterConnectable", "JupyterConnectionInfo", "JupyterClient", "LocalJupyterServer"]
21 changes: 21 additions & 0 deletions autogen/coding/jupyter/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dataclasses import dataclass
from typing import Optional, Protocol, runtime_checkable


@dataclass
class JupyterConnectionInfo:
ekzhu marked this conversation as resolved.
Show resolved Hide resolved
"""(Experimental)"""

host: str
use_https: bool
port: int
token: Optional[str]


@runtime_checkable
class JupyterConnectable(Protocol):
ekzhu marked this conversation as resolved.
Show resolved Hide resolved
"""(Experimental)"""

@property
def connection_info(self) -> JupyterConnectionInfo:
pass
200 changes: 200 additions & 0 deletions autogen/coding/jupyter/jupyter_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from __future__ import annotations

from dataclasses import dataclass
from types import TracebackType
from typing import Any, Dict, List, Optional, cast
import sys

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

import json
import uuid
import datetime
import requests

import websocket
from websocket import WebSocket

from .base import JupyterConnectionInfo


class JupyterClient:
ekzhu marked this conversation as resolved.
Show resolved Hide resolved
"""(Experimental) A client for communicating with a Jupyter gateway server."""

def __init__(self, connection_info: JupyterConnectionInfo):
self._connection_info = connection_info

def _get_headers(self) -> Dict[str, str]:
if self._connection_info.token is None:
return {}
return {"Authorization": f"token {self._connection_info.token}"}

def _get_api_base_url(self) -> str:
protocol = "https" if self._connection_info.use_https else "http"
return f"{protocol}://{self._connection_info.host}:{self._connection_info.port}"

def _get_ws_base_url(self) -> str:
return f"ws://{self._connection_info.host}:{self._connection_info.port}"

def list_kernel_specs(self) -> Dict[str, Dict[str, str]]:
response = requests.get(f"{self._get_api_base_url()}/api/kernelspecs", headers=self._get_headers())
return cast(Dict[str, Dict[str, str]], response.json())

def list_kernels(self) -> List[Dict[str, str]]:
response = requests.get(f"{self._get_api_base_url()}/api/kernels", headers=self._get_headers())
return cast(List[Dict[str, str]], response.json())

def start_kernel(self, kernel_spec_name: str) -> str:
"""Start a new kernel.

Args:
kernel_spec_name (str): Name of the kernel spec to start

Returns:
str: ID of the started kernel
"""

response = requests.post(
f"{self._get_api_base_url()}/api/kernels",
headers=self._get_headers(),
json={"name": kernel_spec_name},
)
return cast(str, response.json()["id"])

def restart_kernel(self, kernel_id: str) -> None:
response = requests.post(
f"{self._get_api_base_url()}/api/kernels/{kernel_id}/restart", headers=self._get_headers()
)
response.raise_for_status()

def get_kernel_client(self, kernel_id: str) -> JupyterKernelClient:
ws_url = f"{self._get_ws_base_url()}/api/kernels/{kernel_id}/channels"
ws = websocket.create_connection(ws_url, header=self._get_headers())
return JupyterKernelClient(ws)


class JupyterKernelClient:
ekzhu marked this conversation as resolved.
Show resolved Hide resolved
"""(Experimental) A client for communicating with a Jupyter kernel."""

@dataclass
class ExecutionResult:
@dataclass
class DataItem:
mime_type: str
data: str

is_ok: bool
output: str
data_items: List[DataItem]

def __init__(self, websocket: WebSocket):
self._session_id: str = uuid.uuid4().hex
self._websocket: WebSocket = websocket

def __enter__(self) -> Self:
return self

def __exit__(
self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
self._websocket.close()

def _send_message(self, *, content: Dict[str, Any], channel: str, message_type: str) -> str:
timestamp = datetime.datetime.now().isoformat()
message_id = uuid.uuid4().hex
message = {
"header": {
"username": "autogen",
"version": "5.0",
"session": self._session_id,
"msg_id": message_id,
"msg_type": message_type,
"date": timestamp,
},
"parent_header": {},
"channel": channel,
"content": content,
"metadata": {},
"buffers": {},
}
self._websocket.send_text(json.dumps(message))
return message_id

def _receive_message(self, timeout_seconds: Optional[float]) -> Optional[Dict[str, Any]]:
self._websocket.settimeout(timeout_seconds)
try:
data = self._websocket.recv()
if isinstance(data, bytes):
data = data.decode("utf-8")
return cast(Dict[str, Any], json.loads(data))
except websocket.WebSocketTimeoutException:
return None

def wait_for_ready(self, timeout_seconds: Optional[float] = None) -> bool:
message_id = self._send_message(content={}, channel="shell", message_type="kernel_info_request")
while True:
message = self._receive_message(timeout_seconds)
# This means we timed out with no new messages.
if message is None:
return False
if (
message.get("parent_header", {}).get("msg_id") == message_id
and message["msg_type"] == "kernel_info_reply"
):
return True

def execute(self, code: str, timeout_seconds: Optional[float] = None) -> ExecutionResult:
message_id = self._send_message(
content={
"code": code,
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": False,
"stop_on_error": True,
},
channel="shell",
message_type="execute_request",
)

text_output = []
data_output = []
while True:
message = self._receive_message(timeout_seconds)
if message is None:
return JupyterKernelClient.ExecutionResult(
is_ok=False, output="ERROR: Timeout waiting for output from code block.", data_items=[]
)

# Ignore messages that are not for this execution.
if message.get("parent_header", {}).get("msg_id") != message_id:
continue

msg_type = message["msg_type"]
content = message["content"]
if msg_type in ["execute_result", "display_data"]:
for data_type, data in content["data"].items():
if data_type == "text/plain":
text_output.append(data)
elif data_type.startswith("image/") or data_type == "text/html":
data_output.append(self.ExecutionResult.DataItem(mime_type=data_type, data=data))
else:
text_output.append(json.dumps(data))
elif msg_type == "stream":
text_output.append(content["text"])
elif msg_type == "error":
# Output is an error.
return JupyterKernelClient.ExecutionResult(
is_ok=False,
output=f"ERROR: {content['ename']}: {content['evalue']}\n{content['traceback']}",
data_items=[],
)
if msg_type == "status" and content["execution_state"] == "idle":
break

return JupyterKernelClient.ExecutionResult(
is_ok=True, output="\n".join([str(output) for output in text_output]), data_items=data_output
)
Loading
Loading