Skip to content
160 changes: 82 additions & 78 deletions src/crewai/agent.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
import shutil
import subprocess
import time
from collections.abc import Callable, Sequence
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)

from pydantic import Field, InstanceOf, PrivateAttr, model_validator

from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent import BaseAgent, PlatformAppOrAction
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
)
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
Expand All @@ -38,24 +50,6 @@
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
Expand Down Expand Up @@ -84,39 +78,40 @@ class Agent(BaseAgent):
step_callback: Callback to be executed after each step of the agent execution.
knowledge_sources: Knowledge sources for the agent.
embedder: Embedder configuration for the agent.
apps: List of applications that the agent can access through CrewAI Platform.
"""

_times_executed: int = PrivateAttr(default=0)
max_execution_time: Optional[int] = Field(
max_execution_time: int | None = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
)
agent_ops_agent_name: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
agent_ops_agent_id: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
step_callback: Optional[Any] = Field(
step_callback: Any | None = Field(
default=None,
description="Callback to be executed after each step of the agent execution.",
)
use_system_prompt: Optional[bool] = Field(
use_system_prompt: bool | None = Field(
default=True,
description="Use system prompt for the agent.",
)
llm: Union[str, InstanceOf[BaseLLM], Any] = Field(
llm: str | InstanceOf[BaseLLM] | Any = Field(
description="Language model that will run the agent.", default=None
)
function_calling_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
function_calling_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
description="Language model that will run the agent.", default=None
)
system_template: Optional[str] = Field(
system_template: str | None = Field(
default=None, description="System format for the agent."
)
prompt_template: Optional[str] = Field(
prompt_template: str | None = Field(
default=None, description="Prompt format for the agent."
)
response_template: Optional[str] = Field(
response_template: str | None = Field(
default=None, description="Response format for the agent."
)
allow_code_execution: Optional[bool] = Field(
allow_code_execution: bool | None = Field(
default=False, description="Enable code execution for the agent."
)
respect_context_window: bool = Field(
Expand Down Expand Up @@ -147,31 +142,31 @@ class Agent(BaseAgent):
default=False,
description="Whether the agent should reflect and create a plan before executing a task.",
)
max_reasoning_attempts: Optional[int] = Field(
max_reasoning_attempts: int | None = Field(
default=None,
description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
)
embedder: Optional[Dict[str, Any]] = Field(
embedder: dict[str, Any] | None = Field(
default=None,
description="Embedder configuration for the agent.",
)
agent_knowledge_context: Optional[str] = Field(
agent_knowledge_context: str | None = Field(
default=None,
description="Knowledge context for the agent.",
)
crew_knowledge_context: Optional[str] = Field(
crew_knowledge_context: str | None = Field(
default=None,
description="Knowledge context for the crew.",
)
knowledge_search_query: Optional[str] = Field(
knowledge_search_query: str | None = Field(
default=None,
description="Knowledge search query for the agent dynamically generated by the agent.",
)
from_repository: Optional[str] = Field(
from_repository: str | None = Field(
default=None,
description="The Agent's role to be used from your repository.",
)
guardrail: Optional[Union[Callable[[Any], Tuple[bool, Any]], str]] = Field(
guardrail: Callable[[Any], tuple[bool, Any]] | str | None = Field(
default=None,
description="Function or string description of a guardrail to validate agent output",
)
Expand All @@ -180,6 +175,7 @@ class Agent(BaseAgent):
)

@model_validator(mode="before")
@classmethod
def validate_from_repository(cls, v):
if v is not None and (from_repository := v.get("from_repository")):
return load_agent_from_repository(from_repository) | v
Expand Down Expand Up @@ -208,7 +204,7 @@ def _setup_agent_executor(self):
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)

def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
def set_knowledge(self, crew_embedder: dict[str, Any] | None = None):
try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
Expand All @@ -224,7 +220,7 @@ def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
)
self.knowledge.add_sources()
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
raise ValueError(f"Invalid Knowledge Configuration: {e!s}") from e

def _is_any_available_memory(self) -> bool:
"""Check if any memory is available."""
Expand All @@ -244,8 +240,8 @@ def _is_any_available_memory(self) -> bool:
def execute_task(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> str:
"""Execute a task with the agent.

Expand Down Expand Up @@ -277,13 +273,9 @@ def execute_task(
# Add the reasoning plan to the task description
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
except Exception as e:
if hasattr(self, "_logger"):
self._logger.log(
"error", f"Error during reasoning process: {str(e)}"
)
else:
print(f"Error during reasoning process: {str(e)}")

self._logger.log(
"error", f"Error during reasoning process: {e!s}"
)
self._inject_date_to_task(task)

if self.tools_handler:
Expand Down Expand Up @@ -335,7 +327,7 @@ def execute_task(
agent=self,
task=task,
)
memory = contextual_memory.build_context_for_task(task, context)
memory = contextual_memory.build_context_for_task(task, context or "")
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)

Expand Down Expand Up @@ -525,14 +517,14 @@ def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> s

try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
except concurrent.futures.TimeoutError as e:
future.cancel()
raise TimeoutError(
f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task."
)
) from e
except Exception as e:
future.cancel()
raise RuntimeError(f"Task execution failed: {str(e)}")
raise RuntimeError(f"Task execution failed: {e!s}") from e

def _execute_without_timeout(self, task_prompt: str, task: Task) -> str:
"""Execute a task without a timeout.
Expand All @@ -554,14 +546,14 @@ def _execute_without_timeout(self, task_prompt: str, task: Task) -> str:
)["output"]

def create_agent_executor(
self, tools: Optional[List[BaseTool]] = None, task=None
self, tools: list[BaseTool] | None = None, task=None
) -> None:
"""Create an agent executor for the agent.

Returns:
An instance of the CrewAgentExecutor class.
"""
raw_tools: List[BaseTool] = tools or self.tools or []
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)

prompt = Prompts(
Expand All @@ -587,7 +579,7 @@ def create_agent_executor(
agent=self,
crew=self.crew,
tools=parsed_tools,
prompt=prompt,
prompt=cast(dict[str, str], prompt),
original_tools=raw_tools,
stop_words=stop_words,
max_iter=self.max_iter,
Expand All @@ -603,10 +595,18 @@ def create_agent_executor(
callbacks=[TokenCalcHandler(self._token_process)],
)

def get_delegation_tools(self, agents: List[BaseAgent]):
def get_delegation_tools(self, agents: list[BaseAgent]):
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
return agent_tools.tools()

def get_platform_tools(self, apps: list[PlatformAppOrAction]) -> list[BaseTool]:
try:
from crewai_tools import CrewaiPlatformTools # type: ignore[import-untyped]

return CrewaiPlatformTools(apps=apps)
except Exception as e:
self._logger.log("error", f"Error getting platform tools: {e!s}")
return []

def get_multimodal_tools(self) -> Sequence[BaseTool]:
from crewai.tools.agent_tools.add_image_tool import AddImageTool
Expand Down Expand Up @@ -654,7 +654,7 @@ def _use_trained_data(self, task_prompt: str) -> str:
)
return task_prompt

def _render_text_description(self, tools: List[Any]) -> str:
def _render_text_description(self, tools: list[Any]) -> str:
"""Render the tool name and description in plain text.

Output will be in the format of:
Expand All @@ -664,14 +664,13 @@ def _render_text_description(self, tools: List[Any]) -> str:
search: This tool is used for search
calculator: This tool is used for math
"""
description = "\n".join(
return "\n".join(
[
f"Tool name: {tool.name}\nTool description:\n{tool.description}"
for tool in tools
]
)

return description

def _inject_date_to_task(self, task):
"""Inject the current date into the task description if inject_date is enabled."""
Expand Down Expand Up @@ -700,28 +699,33 @@ def _inject_date_to_task(self, task):
task.description += f"\n\nCurrent Date: {current_date}"
except Exception as e:
if hasattr(self, "_logger"):
self._logger.log("warning", f"Failed to inject date: {str(e)}")
self._logger.log("warning", f"Failed to inject date: {e!s}")
else:
print(f"Warning: Failed to inject date: {str(e)}")
print(f"Warning: Failed to inject date: {e!s}")

def _validate_docker_installation(self) -> None:
"""Check if Docker is installed and running."""
if not shutil.which("docker"):
docker_path = shutil.which("docker")
if not docker_path:
raise RuntimeError(
f"Docker is not installed. Please install Docker to use code execution with agent: {self.role}"
)

try:
subprocess.run(
["docker", "info"],
subprocess.run( # noqa: S603
[docker_path, "info"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError:
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Docker is not running. Please start Docker to use code execution with agent: {self.role}"
)
) from e
except subprocess.TimeoutExpired as e:
raise RuntimeError(
f"Docker command timed out. Please check your Docker installation for agent: {self.role}"
) from e

def __repr__(self):
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
Expand Down Expand Up @@ -796,8 +800,8 @@ def _get_knowledge_search_query(self, task_prompt: str) -> str | None:

def kickoff(
self,
messages: Union[str, List[Dict[str, str]]],
response_format: Optional[Type[Any]] = None,
messages: str | list[dict[str, str]],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent with the given messages using a LiteAgent instance.
Expand Down Expand Up @@ -836,8 +840,8 @@ def kickoff(

async def kickoff_async(
self,
messages: Union[str, List[Dict[str, str]]],
response_format: Optional[Type[Any]] = None,
messages: str | list[dict[str, str]],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages using a LiteAgent instance.
Expand Down
Loading