Skip to content

Commit 1da8b0b

Browse files
committed
feat: Add comprehensive memory system for intelligent task planning and execution
- Implement MemoryBase with Redis-based persistent storage and semantic similarity search - Create AgentStatusManager for independent agent status management with 30-second time windows - Integrate memory system into MasterAgent for historical experience and enhanced planning - Integrate memory system into TaskExecAgent for task execution context and experience accumulation - Add MemoryManager for unified memory operations and automatic message integration - Support configurable similarity thresholds for memory clustering - Enable graceful degradation when Redis is unavailable, with fallback mechanisms - Add comprehensive documentation and configuration examples Note: The memory system is disabled by default and can be enabled via configuration file.
1 parent c79c9b9 commit 1da8b0b

File tree

20 files changed

+2740
-91
lines changed

20 files changed

+2740
-91
lines changed

master/agents/agent.py

Lines changed: 156 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import yaml
1111
from agents.planner import GlobalTaskPlanner
12+
13+
# Import flagscale last to avoid path conflicts
1214
from flag_scale.flagscale.agent.collaboration import Collaborator
1315

1416

@@ -100,7 +102,6 @@ def _handle_result(self, data: str):
100102
subtask_handle = data.get("subtask_handle")
101103
subtask_result = data.get("subtask_result")
102104

103-
# TODO: Task result should be refered to the next step determination.
104105
if robot_name and subtask_handle and subtask_result:
105106
self.logger.info(
106107
f"================ Received result from {robot_name} ================"
@@ -198,9 +199,131 @@ def reasoning_and_subtasks_is_right(self, reasoning_and_subtasks: dict) -> bool:
198199
except (TypeError, KeyError):
199200
return False
200201

202+
def _save_task_data_to_json(self, task_id: str, task: str, reasoning_and_subtasks: dict):
203+
"""Save task data to JSON file - single file stores all tasks"""
204+
import os
205+
from datetime import datetime
206+
207+
log_dir = os.path.join(os.path.dirname(__file__), '..', '..', '.log')
208+
os.makedirs(log_dir, exist_ok=True)
209+
210+
json_file = os.path.join(log_dir, f"master_data_{task_id}.json")
211+
current_task_data = {
212+
"task_id": task_id,
213+
"task": task,
214+
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
215+
"reasoning_explanation": reasoning_and_subtasks.get("reasoning_explanation", ""),
216+
"subtask_list": reasoning_and_subtasks.get("subtask_list", []),
217+
"prompt_content": self._get_last_prompt_content()
218+
}
219+
220+
if os.path.exists(json_file):
221+
try:
222+
with open(json_file, 'r', encoding='utf-8') as f:
223+
data = json.load(f)
224+
except:
225+
data = {"tasks": []}
226+
else:
227+
data = {"tasks": []}
228+
229+
data["tasks"].append(current_task_data)
230+
with open(json_file, 'w', encoding='utf-8') as f:
231+
json.dump(data, f, ensure_ascii=False, indent=2)
232+
233+
def _get_last_prompt_content(self) -> str:
234+
"""Get the last prompt content"""
235+
if hasattr(self.planner, 'last_prompt_content'):
236+
return self.planner.last_prompt_content
237+
return ""
238+
239+
def _store_task_to_long_term_memory(self, task_id: str, task: str, reasoning_and_subtasks: dict):
240+
"""Store task decomposition results to long-term memory
241+
242+
Args:
243+
task_id: Task ID
244+
task: Original task description
245+
reasoning_and_subtasks: Task decomposition results
246+
"""
247+
if not hasattr(self.planner, 'long_term_memory'):
248+
self.logger.warning(f"Planner does not have long_term_memory attribute, cannot store task {task_id}")
249+
return
250+
if not self.planner.long_term_memory:
251+
self.logger.warning(f"Planner's long_term_memory is None, cannot store task {task_id}")
252+
return
253+
254+
self.logger.info(f"[LongTermMemory] Storing task {task_id} to long-term memory: {task[:50]}")
255+
256+
try:
257+
import time
258+
import sys
259+
import os
260+
import importlib.util
261+
262+
# Import TaskContext and CompactActionStep from slaver memory module
263+
_slaver_path = os.path.join(os.path.dirname(__file__), '..', '..', 'slaver')
264+
sys.path.insert(0, _slaver_path)
265+
try:
266+
from tools.memory import TaskContext, CompactActionStep
267+
except ImportError:
268+
# Fallback to direct file loading
269+
_memory_file = os.path.join(_slaver_path, 'tools', 'memory.py')
270+
spec = importlib.util.spec_from_file_location('memory_module', _memory_file)
271+
memory_module = importlib.util.module_from_spec(spec)
272+
spec.loader.exec_module(memory_module)
273+
TaskContext = memory_module.TaskContext
274+
CompactActionStep = memory_module.CompactActionStep
275+
276+
subtask_list = reasoning_and_subtasks.get("subtask_list", [])
277+
278+
tool_sequence = []
279+
for subtask in subtask_list:
280+
subtask_desc = subtask.get("subtask", "")
281+
if "Navigate" in subtask_desc:
282+
tool_sequence.append("Navigate")
283+
elif "Grasp" in subtask_desc:
284+
tool_sequence.append("Grasp")
285+
elif "Place" in subtask_desc:
286+
tool_sequence.append("Place")
287+
288+
start_time = time.time()
289+
290+
actions = []
291+
for i, subtask in enumerate(subtask_list, 1):
292+
subtask_desc = subtask.get("subtask", "")
293+
action = CompactActionStep(
294+
step_number=i,
295+
timestamp=start_time + i,
296+
tool_name=subtask_desc.split()[0] if subtask_desc else "unknown",
297+
tool_arguments={},
298+
tool_result_summary=f"Subtask: {subtask_desc}",
299+
success=True,
300+
duration=1.0,
301+
error_msg=None
302+
)
303+
actions.append(action)
304+
305+
task_context = TaskContext(
306+
task_id=task_id,
307+
task_text=task,
308+
start_time=start_time,
309+
actions=actions,
310+
end_time=start_time + len(subtask_list),
311+
success=True
312+
)
313+
stored_id = self.planner.long_term_memory.store_task_episode(task_context)
314+
self.logger.info(f"[LongTermMemory] ✅ Task {task_id} stored to long-term memory as {stored_id}")
315+
print(f"[LongTermMemory] ✅ Task {task_id} stored to long-term memory")
316+
317+
except Exception as e:
318+
error_msg = f"Failed to store task {task_id} to long-term memory: {e}"
319+
self.logger.warning(error_msg)
320+
print(f"[LongTermMemory] ❌ {error_msg}")
321+
import traceback
322+
self.logger.warning(traceback.format_exc())
323+
201324
def publish_global_task(self, task: str, refresh: bool, task_id: str) -> Dict:
202325
"""Publish a global task to all Agents"""
203-
self.logger.info(f"Publishing global task: {task}")
326+
print(f"[TASK_START:{task_id}] {task}")
204327

205328
response = self.planner.forward(task)
206329
reasoning_and_subtasks = self._extract_json(response)
@@ -210,25 +333,45 @@ def publish_global_task(self, task: str, refresh: bool, task_id: str) -> Dict:
210333
while (not self.reasoning_and_subtasks_is_right(reasoning_and_subtasks)) and (
211334
attempt < self.config["model"]["model_retry_planning"]
212335
):
213-
self.logger.warning(
214-
f"[WARNING] JSON extraction failed after {self.config['model']['model_retry_planning']} attempts."
215-
)
216-
self.logger.error(
217-
f"[ERROR] Task ({task}) failed to be decomposed into subtasks, it will be ignored."
218-
)
219-
self.logger.warning(
220-
f"Attempt {attempt + 1} to extract JSON failed. Retrying..."
221-
)
222336
response = self.planner.forward(task)
223337
reasoning_and_subtasks = self._extract_json(response)
224338
attempt += 1
225339

226-
self.logger.info(f"Received reasoning and subtasks:\n{reasoning_and_subtasks}")
340+
if reasoning_and_subtasks is None:
341+
reasoning_and_subtasks = {"error": "Failed to extract valid task decomposition"}
342+
print(f"[MASTER_RESPONSE:{task_id}] {json.dumps(reasoning_and_subtasks, ensure_ascii=False)}")
343+
344+
self._save_task_data_to_json(task_id, task, reasoning_and_subtasks)
345+
if reasoning_and_subtasks and "error" not in reasoning_and_subtasks:
346+
self._store_task_to_long_term_memory(task_id, task, reasoning_and_subtasks)
347+
227348
subtask_list = reasoning_and_subtasks.get("subtask_list", [])
228349
grouped_tasks = self._group_tasks_by_order(subtask_list)
229350

230351
task_id = task_id or str(uuid.uuid4()).replace("-", "")
231352

353+
try:
354+
from subtask_analyzer import SubtaskAnalyzer
355+
import os
356+
log_dir = os.path.join(os.path.dirname(__file__), '..', '..', '.log')
357+
analyzer = SubtaskAnalyzer(log_dir=log_dir)
358+
if isinstance(task, list):
359+
task_str = task[0] if task else str(task)
360+
else:
361+
task_str = str(task)
362+
363+
decomposition_record = analyzer.record_decomposition(
364+
task_id=task_id,
365+
original_task=task_str,
366+
reasoning_and_subtasks=reasoning_and_subtasks
367+
)
368+
self.logger.info(f"Subtask decomposition recorded: {decomposition_record.decomposition_quality}")
369+
self.logger.info(f"Decomposition details: {len(subtask_list)} subtasks")
370+
for i, subtask in enumerate(subtask_list, 1):
371+
self.logger.info(f" {i}. [{subtask.get('robot_name', 'unknown')}] {subtask.get('subtask', '')}")
372+
except Exception as e:
373+
self.logger.warning(f"Failed to record subtask: {e}")
374+
232375
threading.Thread(
233376
target=asyncio.run,
234377
args=(self._dispath_subtasks_async(task, task_id, grouped_tasks, refresh),),
@@ -258,5 +401,5 @@ async def _dispath_subtasks_async(
258401
)
259402
working_robots.append(robot_name)
260403
self.collaborator.update_agent_busy(robot_name, True)
261-
self.collaborator.wait_agents_free(working_robots)
404+
self.logger.info(f"Tasks sent to {len(working_robots)} agents, executing asynchronously...")
262405
self.logger.info(f"Task_id ({task_id}) [{task}] has been sent to all agents.")

0 commit comments

Comments
 (0)