1
- from typing import Any , Callable , Dict , List , Optional , Union
1
+ """Agent executor for crew AI agents.
2
+
3
+ Handles agent execution flow including LLM interactions, tool execution,
4
+ and memory management.
5
+ """
6
+
7
+ from collections .abc import Callable
8
+ from typing import Any
2
9
3
10
from crewai .agents .agent_builder .base_agent import BaseAgent
4
11
from crewai .agents .agent_builder .base_agent_executor_mixin import CrewAgentExecutorMixin
8
15
OutputParserException ,
9
16
)
10
17
from crewai .agents .tools_handler import ToolsHandler
11
- from crewai .llm import BaseLLM
12
- from crewai .tools .base_tool import BaseTool
18
+ from crewai .events .event_bus import crewai_event_bus
19
+ from crewai .events .types .logging_events import (
20
+ AgentLogsExecutionEvent ,
21
+ AgentLogsStartedEvent ,
22
+ )
23
+ from crewai .llms .base_llm import BaseLLM
13
24
from crewai .tools .structured_tool import CrewStructuredTool
14
25
from crewai .tools .tool_types import ToolResult
15
26
from crewai .utilities import I18N , Printer
26
37
is_context_length_exceeded ,
27
38
process_llm_response ,
28
39
)
29
- from crewai .utilities .constants import MAX_LLM_RETRY , TRAINING_DATA_FILE
30
- from crewai .utilities .logger import Logger
40
+ from crewai .utilities .constants import TRAINING_DATA_FILE
31
41
from crewai .utilities .tool_utils import execute_tool_and_check_finality
32
42
from crewai .utilities .training_handler import CrewTrainingHandler
33
- from crewai .events .types .logging_events import (
34
- AgentLogsStartedEvent ,
35
- AgentLogsExecutionEvent ,
36
- )
37
- from crewai .events .event_bus import crewai_event_bus
38
43
39
44
40
45
class CrewAgentExecutor (CrewAgentExecutorMixin ):
41
- _logger : Logger = Logger ()
46
+ """Executor for crew agents.
47
+
48
+ Manages the execution lifecycle of an agent including prompt formatting,
49
+ LLM interactions, tool execution, and feedback handling.
50
+ """
42
51
43
52
def __init__ (
44
53
self ,
@@ -48,18 +57,39 @@ def __init__(
48
57
agent : BaseAgent ,
49
58
prompt : dict [str , str ],
50
59
max_iter : int ,
51
- tools : List [CrewStructuredTool ],
60
+ tools : list [CrewStructuredTool ],
52
61
tools_names : str ,
53
- stop_words : List [str ],
62
+ stop_words : list [str ],
54
63
tools_description : str ,
55
64
tools_handler : ToolsHandler ,
56
65
step_callback : Any = None ,
57
- original_tools : List [Any ] | None = None ,
66
+ original_tools : list [Any ] | None = None ,
58
67
function_calling_llm : Any = None ,
59
68
respect_context_window : bool = False ,
60
- request_within_rpm_limit : Optional [Callable [[], bool ]] = None ,
61
- callbacks : List [Any ] | None = None ,
62
- ):
69
+ request_within_rpm_limit : Callable [[], bool ] | None = None ,
70
+ callbacks : list [Any ] | None = None ,
71
+ ) -> None :
72
+ """Initialize executor.
73
+
74
+ Args:
75
+ llm: Language model instance.
76
+ task: Task to execute.
77
+ crew: Crew instance.
78
+ agent: Agent to execute.
79
+ prompt: Prompt templates.
80
+ max_iter: Maximum iterations.
81
+ tools: Available tools.
82
+ tools_names: Tool names string.
83
+ stop_words: Stop word list.
84
+ tools_description: Tool descriptions.
85
+ tools_handler: Tool handler instance.
86
+ step_callback: Optional step callback.
87
+ original_tools: Original tool list.
88
+ function_calling_llm: Optional function calling LLM.
89
+ respect_context_window: Respect context limits.
90
+ request_within_rpm_limit: RPM limit check function.
91
+ callbacks: Optional callbacks list.
92
+ """
63
93
self ._i18n : I18N = I18N ()
64
94
self .llm : BaseLLM = llm
65
95
self .task = task
@@ -81,12 +111,9 @@ def __init__(
81
111
self .respect_context_window = respect_context_window
82
112
self .request_within_rpm_limit = request_within_rpm_limit
83
113
self .ask_for_human_input = False
84
- self .messages : List [ Dict [str , str ]] = []
114
+ self .messages : list [ dict [str , str ]] = []
85
115
self .iterations = 0
86
116
self .log_error_after = 3
87
- self .tool_name_to_tool_map : Dict [str , Union [CrewStructuredTool , BaseTool ]] = {
88
- tool .name : tool for tool in self .tools
89
- }
90
117
existing_stop = self .llm .stop or []
91
118
self .llm .stop = list (
92
119
set (
@@ -96,7 +123,15 @@ def __init__(
96
123
)
97
124
)
98
125
99
- def invoke (self , inputs : Dict [str , str ]) -> Dict [str , Any ]:
126
+ def invoke (self , inputs : dict [str , str ]) -> dict [str , Any ]:
127
+ """Execute the agent with given inputs.
128
+
129
+ Args:
130
+ inputs: Input dictionary containing prompt variables.
131
+
132
+ Returns:
133
+ Dictionary with agent output.
134
+ """
100
135
if "system" in self .prompt :
101
136
system_prompt = self ._format_prompt (self .prompt .get ("system" , "" ), inputs )
102
137
user_prompt = self ._format_prompt (self .prompt .get ("user" , "" ), inputs )
@@ -131,9 +166,10 @@ def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
131
166
return {"output" : formatted_answer .output }
132
167
133
168
def _invoke_loop (self ) -> AgentFinish :
134
- """
135
- Main loop to invoke the agent's thought process until it reaches a conclusion
136
- or the maximum number of iterations is reached.
169
+ """Execute agent loop until completion.
170
+
171
+ Returns:
172
+ Final answer from the agent.
137
173
"""
138
174
formatted_answer = None
139
175
while not isinstance (formatted_answer , AgentFinish ):
@@ -190,7 +226,7 @@ def _invoke_loop(self) -> AgentFinish:
190
226
)
191
227
192
228
self ._invoke_step_callback (formatted_answer )
193
- self ._append_message (formatted_answer .text , role = "assistant" )
229
+ self ._append_message (formatted_answer .text )
194
230
195
231
except OutputParserException as e :
196
232
formatted_answer = handle_output_parser_exception (
@@ -231,8 +267,16 @@ def _invoke_loop(self) -> AgentFinish:
231
267
232
268
def _handle_agent_action (
233
269
self , formatted_answer : AgentAction , tool_result : ToolResult
234
- ) -> Union [AgentAction , AgentFinish ]:
235
- """Handle the AgentAction, execute tools, and process the results."""
270
+ ) -> AgentAction | AgentFinish :
271
+ """Process agent action and tool execution.
272
+
273
+ Args:
274
+ formatted_answer: Agent's action to execute.
275
+ tool_result: Result from tool execution.
276
+
277
+ Returns:
278
+ Updated action or final answer.
279
+ """
236
280
# Special case for add_image_tool
237
281
add_image_tool = self ._i18n .tools ("add_image" )
238
282
if (
@@ -251,17 +295,28 @@ def _handle_agent_action(
251
295
show_logs = self ._show_logs ,
252
296
)
253
297
254
- def _invoke_step_callback (self , formatted_answer ) -> None :
255
- """Invoke the step callback if it exists."""
298
+ def _invoke_step_callback (
299
+ self , formatted_answer : AgentAction | AgentFinish
300
+ ) -> None :
301
+ """Invoke step callback.
302
+
303
+ Args:
304
+ formatted_answer: Current agent response.
305
+ """
256
306
if self .step_callback :
257
307
self .step_callback (formatted_answer )
258
308
259
309
def _append_message (self , text : str , role : str = "assistant" ) -> None :
260
- """Append a message to the message list with the given role."""
310
+ """Add message to conversation history.
311
+
312
+ Args:
313
+ text: Message content.
314
+ role: Message role (default: assistant).
315
+ """
261
316
self .messages .append (format_message_for_llm (text , role = role ))
262
317
263
- def _show_start_logs (self ):
264
- """Show logs for the start of agent execution ."""
318
+ def _show_start_logs (self ) -> None :
319
+ """Emit agent start event ."""
265
320
if self .agent is None :
266
321
raise ValueError ("Agent cannot be None" )
267
322
@@ -277,8 +332,12 @@ def _show_start_logs(self):
277
332
),
278
333
)
279
334
280
- def _show_logs (self , formatted_answer : Union [AgentAction , AgentFinish ]):
281
- """Show logs for the agent's execution."""
335
+ def _show_logs (self , formatted_answer : AgentAction | AgentFinish ) -> None :
336
+ """Emit agent execution event.
337
+
338
+ Args:
339
+ formatted_answer: Agent's response to log.
340
+ """
282
341
if self .agent is None :
283
342
raise ValueError ("Agent cannot be None" )
284
343
@@ -292,44 +351,16 @@ def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
292
351
),
293
352
)
294
353
295
- def _summarize_messages (self ) -> None :
296
- messages_groups = []
297
- for message in self .messages :
298
- content = message ["content" ]
299
- cut_size = self .llm .get_context_window_size ()
300
- for i in range (0 , len (content ), cut_size ):
301
- messages_groups .append ({"content" : content [i : i + cut_size ]})
302
-
303
- summarized_contents = []
304
- for group in messages_groups :
305
- summary = self .llm .call (
306
- [
307
- format_message_for_llm (
308
- self ._i18n .slice ("summarizer_system_message" ), role = "system"
309
- ),
310
- format_message_for_llm (
311
- self ._i18n .slice ("summarize_instruction" ).format (
312
- group = group ["content" ]
313
- ),
314
- ),
315
- ],
316
- callbacks = self .callbacks ,
317
- )
318
- summarized_contents .append ({"content" : str (summary )})
319
-
320
- merged_summary = " " .join (content ["content" ] for content in summarized_contents )
321
-
322
- self .messages = [
323
- format_message_for_llm (
324
- self ._i18n .slice ("summary" ).format (merged_summary = merged_summary )
325
- )
326
- ]
327
-
328
354
def _handle_crew_training_output (
329
- self , result : AgentFinish , human_feedback : Optional [ str ] = None
355
+ self , result : AgentFinish , human_feedback : str | None = None
330
356
) -> None :
331
- """Handle the process of saving training data."""
332
- agent_id = str (self .agent .id ) # type: ignore
357
+ """Save training data.
358
+
359
+ Args:
360
+ result: Agent's final output.
361
+ human_feedback: Optional feedback from human.
362
+ """
363
+ agent_id = str (self .agent .id )
333
364
train_iteration = (
334
365
getattr (self .crew , "_train_iteration" , None ) if self .crew else None
335
366
)
@@ -371,20 +402,30 @@ def _handle_crew_training_output(
371
402
training_data [agent_id ] = agent_training_data
372
403
training_handler .save (training_data )
373
404
374
- def _format_prompt (self , prompt : str , inputs : Dict [str , str ]) -> str :
405
+ @staticmethod
406
+ def _format_prompt (prompt : str , inputs : dict [str , str ]) -> str :
407
+ """Format prompt with input values.
408
+
409
+ Args:
410
+ prompt: Template string.
411
+ inputs: Values to substitute.
412
+
413
+ Returns:
414
+ Formatted prompt.
415
+ """
375
416
prompt = prompt .replace ("{input}" , inputs ["input" ])
376
417
prompt = prompt .replace ("{tool_names}" , inputs ["tool_names" ])
377
418
prompt = prompt .replace ("{tools}" , inputs ["tools" ])
378
419
return prompt
379
420
380
421
def _handle_human_feedback (self , formatted_answer : AgentFinish ) -> AgentFinish :
381
- """Handle human feedback with different flows for training vs regular use .
422
+ """Process human feedback.
382
423
383
424
Args:
384
- formatted_answer: The initial AgentFinish result to get feedback on
425
+ formatted_answer: Initial agent result.
385
426
386
427
Returns:
387
- AgentFinish: The final answer after processing feedback
428
+ Final answer after feedback.
388
429
"""
389
430
human_feedback = self ._ask_human_input (formatted_answer .output )
390
431
@@ -394,13 +435,25 @@ def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
394
435
return self ._handle_regular_feedback (formatted_answer , human_feedback )
395
436
396
437
def _is_training_mode (self ) -> bool :
397
- """Check if crew is in training mode."""
438
+ """Check if training mode is active.
439
+
440
+ Returns:
441
+ True if in training mode.
442
+ """
398
443
return bool (self .crew and self .crew ._train )
399
444
400
445
def _handle_training_feedback (
401
446
self , initial_answer : AgentFinish , feedback : str
402
447
) -> AgentFinish :
403
- """Process feedback for training scenarios with single iteration."""
448
+ """Process training feedback.
449
+
450
+ Args:
451
+ initial_answer: Initial agent output.
452
+ feedback: Training feedback.
453
+
454
+ Returns:
455
+ Improved answer.
456
+ """
404
457
self ._handle_crew_training_output (initial_answer , feedback )
405
458
self .messages .append (
406
459
format_message_for_llm (
@@ -415,7 +468,15 @@ def _handle_training_feedback(
415
468
def _handle_regular_feedback (
416
469
self , current_answer : AgentFinish , initial_feedback : str
417
470
) -> AgentFinish :
418
- """Process feedback for regular use with potential multiple iterations."""
471
+ """Process regular feedback iteratively.
472
+
473
+ Args:
474
+ current_answer: Current agent output.
475
+ initial_feedback: Initial user feedback.
476
+
477
+ Returns:
478
+ Final answer after iterations.
479
+ """
419
480
feedback = initial_feedback
420
481
answer = current_answer
421
482
@@ -430,30 +491,17 @@ def _handle_regular_feedback(
430
491
return answer
431
492
432
493
def _process_feedback_iteration (self , feedback : str ) -> AgentFinish :
433
- """Process a single feedback iteration."""
494
+ """Process single feedback iteration.
495
+
496
+ Args:
497
+ feedback: User feedback.
498
+
499
+ Returns:
500
+ Updated agent response.
501
+ """
434
502
self .messages .append (
435
503
format_message_for_llm (
436
504
self ._i18n .slice ("feedback_instructions" ).format (feedback = feedback )
437
505
)
438
506
)
439
507
return self ._invoke_loop ()
440
-
441
- def _log_feedback_error (self , retry_count : int , error : Exception ) -> None :
442
- """Log feedback processing errors."""
443
- self ._printer .print (
444
- content = (
445
- f"Error processing feedback: { error } . "
446
- f"Retrying... ({ retry_count + 1 } /{ MAX_LLM_RETRY } )"
447
- ),
448
- color = "red" ,
449
- )
450
-
451
- def _log_max_retries_exceeded (self ) -> None :
452
- """Log when max retries for feedback processing are exceeded."""
453
- self ._printer .print (
454
- content = (
455
- f"Failed to process feedback after { MAX_LLM_RETRY } attempts. "
456
- "Ending feedback loop."
457
- ),
458
- color = "red" ,
459
- )
0 commit comments