Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""
Agent factory for creating AI agent instances using Microsoft Agent Framework.
"""
from typing import Dict, Any, List, Union
from typing import Dict, Any, List, Union, Optional
from contextlib import asynccontextmanager

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
Expand All @@ -14,32 +15,73 @@


class AgentFactory:
"""Factory for creating AI agent instances using Microsoft Agent Framework."""
"""Factory for creating AI agent instances using Microsoft Agent Framework.

This factory uses async context management patterns as recommended by the
Microsoft Agent Framework documentation. Clients should be used with async context
managers to ensure proper resource cleanup.
"""

def __init__(self):
self.chat_clients = {}
self._initialize_clients()
self._credential: Optional[DefaultAzureCredential] = None
self._azure_chat_client: Optional[AzureOpenAIChatClient] = None

async def __aenter__(self):
"""Async context manager entry."""
await self._initialize_clients()
return self

def _initialize_clients(self) -> None:
"""Initialize Azure OpenAI chat clients."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self._cleanup_clients()
return False

async def _initialize_clients(self) -> None:
"""Initialize Azure OpenAI chat clients with async context management."""
try:
# Initialize Azure OpenAI client using correct Microsoft Agent Framework pattern
credential = DefaultAzureCredential()
# Initialize Azure credential
self._credential = DefaultAzureCredential()

# Create Azure OpenAI Chat Client
self.azure_chat_client = AzureOpenAIChatClient(credential=credential)
# Note: AzureOpenAIChatClient can be used directly without async context
# but we store it for potential cleanup
self._azure_chat_client = AzureOpenAIChatClient(credential=self._credential)

logger.info("Azure OpenAI chat client initialized successfully")

except Exception as e:
logger.error(f"Error initializing Azure OpenAI clients: {e}")
raise

def create_agent(self, agent_config: Agent) -> ChatAgent:
"""Create an AI agent instance from configuration."""
async def _cleanup_clients(self) -> None:
"""Clean up clients and resources."""
try:
# Clean up any resources if needed
self._azure_chat_client = None
self._credential = None
logger.info("Azure OpenAI chat clients cleaned up")
except Exception as e:
logger.error(f"Error cleaning up clients: {e}")

async def create_agent(self, agent_config: Agent) -> ChatAgent:
"""Create an AI agent instance from configuration.

Args:
agent_config: Agent configuration containing name, instructions, etc.

Returns:
ChatAgent instance ready to use

Raises:
Exception: If agent creation fails
"""
try:
if self._azure_chat_client is None:
raise RuntimeError("AgentFactory not initialized. Use 'async with AgentFactory()' pattern.")

# Create agent using Microsoft Agent Framework pattern
agent = self.azure_chat_client.create_agent(
agent = self._azure_chat_client.create_agent(
instructions=agent_config.instructions,
name=agent_config.name
)
Expand All @@ -51,8 +93,15 @@ def create_agent(self, agent_config: Agent) -> ChatAgent:
logger.error(f"Error creating agent {agent_config.name}: {e}")
raise

def create_specialist_agent(self, agent_config: Agent) -> ChatAgent:
"""Create a specialist agent with enhanced instructions."""
async def create_specialist_agent(self, agent_config: Agent) -> ChatAgent:
"""Create a specialist agent with enhanced instructions.

Args:
agent_config: Agent configuration

Returns:
ChatAgent configured as a specialist
"""
# Enhance instructions for specialist behavior
specialist_instructions = f"""
You are a specialist agent with expertise in: {agent_config.description or 'your domain'}.
Expand All @@ -68,18 +117,32 @@ def create_specialist_agent(self, agent_config: Agent) -> ChatAgent:
5. Be precise and thorough in your responses
"""

if self._azure_chat_client is None:
raise RuntimeError("AgentFactory not initialized. Use 'async with AgentFactory()' pattern.")

# Create agent with enhanced instructions
agent = self.azure_chat_client.create_agent(
agent = self._azure_chat_client.create_agent(
instructions=specialist_instructions,
name=agent_config.name
)

logger.info(f"Created specialist agent: {agent_config.name}")
return agent

def create_workflow_agent(self, name: str, instructions: str) -> ChatAgent:
"""Create an agent specifically for workflow use."""
agent = self.azure_chat_client.create_agent(
async def create_workflow_agent(self, name: str, instructions: str) -> ChatAgent:
"""Create an agent specifically for workflow use.

Args:
name: Agent name
instructions: Agent instructions

Returns:
ChatAgent for workflow execution
"""
if self._azure_chat_client is None:
raise RuntimeError("AgentFactory not initialized. Use 'async with AgentFactory()' pattern.")

agent = self._azure_chat_client.create_agent(
instructions=instructions,
name=name
)
Expand All @@ -88,7 +151,15 @@ def create_workflow_agent(self, name: str, instructions: str) -> ChatAgent:
return agent

async def run_agent(self, agent: ChatAgent, message: str) -> str:
"""Run an agent with a message and return the response."""
"""Run an agent with a message and return the response.

Args:
agent: ChatAgent instance to run
message: User message to process

Returns:
Agent response as string
"""
try:
# Run the agent using the correct Agent Framework pattern
response = await agent.run(message)
Expand All @@ -106,7 +177,15 @@ async def run_agent(self, agent: ChatAgent, message: str) -> str:
raise

async def run_agent_streaming(self, agent: ChatAgent, message: str):
"""Run an agent with streaming response."""
"""Run an agent with streaming response.

Args:
agent: ChatAgent instance to run
message: User message to process

Yields:
Response updates as they arrive
"""
try:
# Run the agent with streaming using the correct Agent Framework pattern
async for update in agent.run_stream(message):
Expand All @@ -117,17 +196,34 @@ async def run_agent_streaming(self, agent: ChatAgent, message: str):
raise

def get_available_clients(self) -> Dict[str, Any]:
"""Get information about available clients."""
"""Get information about available clients.

Returns:
Dict with client availability information
"""
return {
'azure_openai_available': hasattr(self, 'azure_chat_client'),
'azure_openai_available': self._azure_chat_client is not None,
'agent_framework_available': True,
}

def get_supported_agent_types(self) -> List[str]:
"""Get list of supported agent types."""
"""Get list of supported agent types.

Returns:
List of supported agent type values
"""
return [
AgentType.CHAT_AGENT.value,
AgentType.SPECIALIST_AGENT.value,
AgentType.TOOL_AGENT.value,
AgentType.CUSTOM_AGENT.value
]
]

@property
def azure_chat_client(self) -> Optional[AzureOpenAIChatClient]:
"""Get the Azure chat client for backward compatibility.

Returns:
AzureOpenAIChatClient instance or None
"""
return self._azure_chat_client
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
from app.core.database import get_db
from app.models import WebSocketMessage, WorkflowExecutionEvent
from app.services.websocket_service import WebSocketManager
from app.services.execution_service import ExecutionService
from app.workflows.workflow_executor import WorkflowExecutor
from app.core.logging import get_logger

logger = get_logger(__name__)

router = APIRouter()

Expand All @@ -34,31 +39,121 @@ async def websocket_endpoint(websocket: WebSocket):
except WebSocketDisconnect:
websocket_manager.disconnect(websocket)
except Exception as e:
print(f"WebSocket error: {e}")
logger.error(f"WebSocket error: {e}")
websocket_manager.disconnect(websocket)


@router.websocket("/execution/{execution_id}")
async def execution_websocket(websocket: WebSocket, execution_id: int):
"""WebSocket endpoint for specific execution monitoring."""
async def execution_websocket(
websocket: WebSocket,
execution_id: int,
db: Session = Depends(get_db)
):
"""WebSocket endpoint for specific execution monitoring with real-time streaming.

This endpoint provides real-time updates for workflow execution including:
- Execution start/stop events
- Workflow progress updates
- Agent execution events
- Error notifications
- Completion status
"""
await websocket_manager.connect_to_execution(websocket, execution_id)

# Create execution service to fetch workflow details
execution_service = ExecutionService(db)
workflow_executor = WorkflowExecutor()

try:
# Start streaming task in background
async def stream_execution():
"""Stream execution events to the connected WebSocket."""
try:
# Get execution details
execution = await execution_service.get_execution(execution_id)
if not execution:
await websocket_manager.send_personal_message({
"type": "error",
"data": {
"message": f"Execution {execution_id} not found",
"execution_id": execution_id
}
}, websocket)
return

# Get workflow
workflow = await execution_service.get_workflow_for_execution(execution_id)
if not workflow:
await websocket_manager.send_personal_message({
"type": "error",
"data": {
"message": "Workflow not found for execution",
"execution_id": execution_id
}
}, websocket)
return

# Stream workflow execution events
async for event in workflow_executor.execute_with_events(
workflow=workflow.workflow_obj, # Actual Agent Framework workflow
Comment on lines +97 to +98
Copy link

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment suggests accessing 'workflow_obj' property, but this may not exist on the workflow model. Verify that the workflow object has this attribute or adjust the implementation accordingly.

Suggested change
async for event in workflow_executor.execute_with_events(
workflow=workflow.workflow_obj, # Actual Agent Framework workflow
# Check if workflow_obj exists, otherwise use workflow directly
wf_obj = getattr(workflow, "workflow_obj", None)
if wf_obj is None:
logger.error(f"'workflow_obj' attribute not found on workflow for execution {execution_id}")
await websocket_manager.send_personal_message({
"type": "error",
"data": {
"message": "'workflow_obj' attribute not found on workflow",
"execution_id": execution_id
}
}, websocket)
return
async for event in workflow_executor.execute_with_events(
workflow=wf_obj, # Actual Agent Framework workflow

Copilot uses AI. Check for mistakes.
input_data=execution.input_data,
execution_id=execution_id
):
# Broadcast event to all connected clients monitoring this execution
await websocket_manager.broadcast_to_execution(execution_id, {
"type": "execution_event",
"data": event
})

except Exception as e:
logger.error(f"Error streaming execution {execution_id}: {e}")
await websocket_manager.send_personal_message({
"type": "error",
"data": {
"message": f"Error streaming execution: {str(e)}",
"execution_id": execution_id
}
}, websocket)

# Start streaming in background
stream_task = asyncio.create_task(stream_execution())

# Handle client messages while streaming
while True:
# Keep connection alive and handle any client messages
data = await websocket.receive_text()
message_data = json.loads(data)

# Handle execution-specific messages
await websocket_manager.handle_execution_message(
websocket, execution_id, message_data
)
message_type = message_data.get("type")

if message_type == "cancel_execution":
# Cancel the execution
stream_task.cancel()
await websocket_manager.send_personal_message({
"type": "execution_cancelled",
"data": {"execution_id": execution_id}
}, websocket)
break
else:
await websocket_manager.handle_execution_message(
websocket, execution_id, message_data
)

except WebSocketDisconnect:
logger.info(f"WebSocket disconnected from execution {execution_id}")
websocket_manager.disconnect_from_execution(websocket, execution_id)
except Exception as e:
print(f"Execution WebSocket error: {e}")
logger.error(f"Execution WebSocket error: {e}")
websocket_manager.disconnect_from_execution(websocket, execution_id)
finally:
# Clean up streaming task if still running
if 'stream_task' in locals() and not stream_task.done():
stream_task.cancel()
try:
await stream_task
except asyncio.CancelledError:
pass


# HTTP endpoints for WebSocket management
Expand Down
Loading