Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions reme_ai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from . import agent # noqa: E402
from . import config # noqa: E402
from . import constants # noqa: E402
from . import context # noqa: E402
from . import enumeration # noqa: E402
from . import retrieve # noqa: E402
from . import schema # noqa: E402
Expand All @@ -21,6 +22,7 @@
"agent",
"config",
"constants",
"context",
"enumeration",
"retrieve",
"schema",
Expand Down
81 changes: 78 additions & 3 deletions reme_ai/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,83 @@ flow:
description: "user query"
required: true

context_offload:
flow_content: ContextOffloadOp() >> BatchWriteFileOp()
description: "Manages context window limits by compacting tool messages and compressing conversation history. First compacts large tool messages by storing full content in external files, then applies LLM-based compression if compaction ratio exceeds threshold. This helps reduce token usage while preserving important information."
input_schema:
messages:
type: array
description: "List of conversation messages to process for context offloading"
required: true
context_manage_mode:
type: string
description: "Context management mode: 'compact' only applies compaction to tool messages, 'compress' only applies LLM-based compression, 'auto' applies compaction first then compression if compaction ratio exceeds threshold. Defaults to 'auto'."
required: false
enum: ["compact", "compress", "auto"]
max_total_tokens:
type: integer
description: "Maximum token count threshold for triggering compression/compaction. For compaction, this is the total token count threshold. For compression, this excludes keep_recent_count messages and system messages. Defaults to 20000."
required: false
max_tool_message_tokens:
type: integer
description: "Maximum token count per tool message before compaction is applied. Tool messages exceeding this threshold will have their full content stored in external files with only a preview kept in context. Defaults to 2000."
required: false
group_token_threshold:
type: integer
description: "Maximum token count per compression group when using LLM-based compression. If None or 0, all messages are compressed in a single group. Messages exceeding this threshold individually will form their own group. Only used in 'compress' or 'auto' mode."
required: false
keep_recent_count:
type: integer
description: "Number of recent messages to preserve without compression or compaction. These messages remain unchanged to maintain conversation context. Defaults to 1 for compaction and 2 for compression."
required: false
store_dir:
type: string
description: "Directory path for storing offloaded message content. Full tool message content and compressed message groups are saved as files in this directory. Required for compaction and compression operations."
required: false
chat_id:
type: string
description: "Unique identifier for the chat session, used for file naming when storing compressed message groups. If not provided, a UUID will be generated automatically."
required: false

context_offload_for_agentscope:
flow_content: ContextOffloadOp()
description: "Context offload operation for AgentScope integration. Manages context window limits by compacting tool messages and compressing conversation history without batch file writing. Same functionality as context_offload but without the BatchWriteFileOp step."
input_schema:
messages:
type: array
description: "List of conversation messages to process for context offloading"
required: true
context_manage_mode:
type: string
description: "Context management mode: 'compact' only applies compaction to tool messages, 'compress' only applies LLM-based compression, 'auto' applies compaction first then compression if compaction ratio exceeds threshold. Defaults to 'auto'."
required: false
enum: ["compact", "compress", "auto"]
max_total_tokens:
type: integer
description: "Maximum token count threshold for triggering compression/compaction. For compaction, this is the total token count threshold. For compression, this excludes keep_recent_count messages and system messages. Defaults to 20000."
required: false
max_tool_message_tokens:
type: integer
description: "Maximum token count per tool message before compaction is applied. Tool messages exceeding this threshold will have their full content stored in external files with only a preview kept in context. Defaults to 2000."
required: false
group_token_threshold:
type: integer
description: "Maximum token count per compression group when using LLM-based compression. If None or 0, all messages are compressed in a single group. Messages exceeding this threshold individually will form their own group. Only used in 'compress' or 'auto' mode."
required: false
keep_recent_count:
type: integer
description: "Number of recent messages to preserve without compression or compaction. These messages remain unchanged to maintain conversation context. Defaults to 1 for compaction and 2 for compression."
required: false
store_dir:
type: string
description: "Directory path for storing offloaded message content. Full tool message content and compressed message groups are saved as files in this directory. Required for compaction and compression operations."
required: false
chat_id:
type: string
description: "Unique identifier for the chat session, used for file naming when storing compressed message groups. If not provided, a UUID will be generated automatically."
required: false


llm:
default:
backend: openai_compatible
Expand All @@ -164,9 +241,7 @@ llm:
temperature: 0.6
token_count: # Optional
model_name: Qwen/Qwen3-30B-A3B-Instruct-2507
backend: hf
params:
use_mirror: true
backend: base

qwen3_30b_instruct:
backend: openai_compatible
Expand Down
14 changes: 14 additions & 0 deletions reme_ai/context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Context management module for ReMe framework.

This module provides submodules for different types of context management operations:
- file_tool: File-related operations for reading, writing, and searching files
- offload: Context offload operations for reducing token usage and managing context windows
"""

from . import file_tool
from . import offload

__all__ = [
"file_tool",
"offload",
]
19 changes: 19 additions & 0 deletions reme_ai/context/offload/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Context offload package for ReMe framework.

This package provides context management operations that can be used in LLM-powered flows
to reduce token usage and manage context window limits. It includes ready-to-use operations for:

- ContextCompactOp: Compact tool messages by storing full content in external files
- ContextCompressOp: Compress conversation history using LLM to generate concise summaries
- ContextOffloadOp: Orchestrate compaction and compression to reduce token usage
"""

from .context_compact_op import ContextCompactOp
from .context_compress_op import ContextCompressOp
from .context_offload_op import ContextOffloadOp

__all__ = [
"ContextCompactOp",
"ContextCompressOp",
"ContextOffloadOp",
]
126 changes: 60 additions & 66 deletions reme_ai/context/offload/context_compact_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
This helps manage context window limits while preserving important information.
"""

import json
from pathlib import Path
from typing import List
from uuid import uuid4

from flowllm.core.context import C
Expand All @@ -28,36 +26,6 @@ class ContextCompactOp(BaseAsyncOp):
This helps manage context window limits while preserving recent tool messages.
"""

def __init__(
self,
all_token_threshold: int = 20000,
tool_token_threshold: int = 2000,
tool_left_char_len: int = 100,
keep_recent: int = 1,
storage_path: str = "./",
exclude_tools: List[str] = None,
**kwargs,
):
"""
Initialize the context compaction operation.

Args:
all_token_threshold: Maximum total token count before compaction is triggered.
tool_token_threshold: Maximum token count for a single tool message before it's compressed.
tool_left_char_len: Number of characters to keep in the compressed tool message preview.
keep_recent: Number of recent tool messages to keep uncompressed.
storage_path: Directory path where compressed tool message contents will be stored.
exclude_tools: List of tool names to exclude from compaction (not currently used).
**kwargs: Additional arguments passed to the base class.
"""
super().__init__(**kwargs)
self.all_token_threshold: int = all_token_threshold
self.tool_token_threshold: int = tool_token_threshold
self.tool_left_char_len: int = tool_left_char_len
self.keep_recent: int = keep_recent
self.storage_path: Path = Path(storage_path)
self.exclude_tools: List[str] = exclude_tools

async def async_execute(self):
"""
Execute the context compaction operation.
Expand All @@ -70,41 +38,44 @@ async def async_execute(self):
- Storing full content in external files
- Preserving recent tool messages
"""
# Get configuration from context
max_total_tokens: int = self.context.get("max_total_tokens", 20000)
max_tool_message_tokens: int = self.context.get("max_tool_message_tokens", 2000)
preview_char_length: int = self.context.get("preview_char_length", 100)
keep_recent_count: int = self.context.get("keep_recent_count", 1)
store_dir: Path = Path(self.context.get("store_dir", ""))

assert max_total_tokens > 0, "max_total_tokens must be greater than 0"
assert max_tool_message_tokens > 0, "max_tool_message_tokens must be greater than 0"
assert preview_char_length >= 0, "preview_char_length must be greater than 0"
assert keep_recent_count > 0, "keep_recent_count must be greater than 0"

# Convert context messages to Message objects
messages = [Message(**x) for x in self.context.messages]
messages_to_compress = [x for x in messages if x.role is not Role.SYSTEM][:-keep_recent_count]

# Calculate total token count
token_cnt: int = self.token_count(messages)
logger.info(f"Context compaction check: total token count={token_cnt}, threshold={self.all_token_threshold}")

# If token count is within threshold, no compaction needed
if token_cnt <= self.all_token_threshold:
# If nothing to compress after filtering, return original messages
if not messages_to_compress:
self.context.response.answer = self.context.messages
logger.info(
f"Token count ({token_cnt}) is within threshold ({self.all_token_threshold}), no compaction needed",
)
logger.info("No messages to compress after filtering, returning original messages")
return

# Filter tool messages for processing
tool_messages = [x for x in messages if x.role is Role.TOOL]
logger.info(f"{len(messages_to_compress)} messages remaining for compression check")

# Calculate total token count
compact_token_cnt: int = self.token_count(messages_to_compress)
logger.info(f"Context compaction check: total token count={compact_token_cnt}, threshold={max_total_tokens}")

# If there are too few tool messages, no compaction needed
if len(tool_messages) <= self.keep_recent:
# If token count is within threshold, no compaction needed
if compact_token_cnt <= max_total_tokens:
self.context.response.answer = self.context.messages
logger.info(
f"Tool message count ({len(tool_messages)}) is less than or "
f"equal to keep_recent ({self.keep_recent}), no compaction needed",
)
logger.info(f"Token count ({compact_token_cnt}) is within ({max_total_tokens}), no compaction needed")
return

# Exclude recent tool messages from compaction (keep them intact)
tool_messages = tool_messages[: -self.keep_recent]
logger.info(
f"Processing {len(tool_messages)} tool messages for "
f"compaction (keeping {self.keep_recent} recent messages)",
)
# Filter tool messages for processing
tool_messages = [x for x in messages_to_compress if x.role is Role.TOOL]

# Dictionary to store file paths and their compressed content (for potential batch writing)
# Dictionary to store file paths and their full content (for potential batch writing)
write_file_dict = {}

# Process each tool message
Expand All @@ -113,33 +84,56 @@ async def async_execute(self):
tool_token_cnt = self.token_count([tool_message])

# Skip if token count is within threshold
if tool_token_cnt <= self.tool_token_threshold:
if tool_token_cnt <= max_tool_message_tokens:
logger.info(
f"Skipping tool message (tool_call_id={tool_message.tool_call_id}): "
f"token count ({tool_token_cnt}) is within threshold ({self.tool_token_threshold})",
f"token count ({tool_token_cnt}) is within threshold ({max_tool_message_tokens})",
)
continue

# Create compressed preview of the tool message content
compact_result = tool_message.content[: self.tool_left_char_len] + "..."
# Save original full content before modifying
original_content = tool_message.content

# Generate file name from tool_call_id or create a unique identifier
file_name = tool_message.tool_call_id or uuid4().hex
path = self.storage_path / f"{file_name}.txt"
store_path = store_dir / f"{file_name}.txt"

# Store the full content for batch writing
write_file_dict[store_path.as_posix()] = original_content

# Store the mapping for potential batch writing
write_file_dict[str(path)] = compact_result
# Create compressed preview of the tool message content
compact_result = original_content[:preview_char_length] + "..."

# Log the compaction action
logger.info(
f"Compacting tool message (tool_call_id={tool_message.tool_call_id}): "
f"token count={tool_token_cnt}, saving full content to {path}",
f"token count={tool_token_cnt}, saving full content to {store_path}",
)

# Update tool message content with preview and file reference
compact_result += f" (detailed result is stored in {path})"
compact_result += f" (detailed result is stored in {store_path})"
tool_message.content = compact_result

# Store write_file_dict in context for potential batch writing
if write_file_dict:
self.context.write_file_dict = write_file_dict

# Return the compacted messages as JSON
self.context.response.answer = json.dumps([x.simple_dump() for x in messages], ensure_ascii=False, indent=2)
self.context.response.answer = [x.simple_dump() for x in messages]
self.context.response.metadata["write_file_dict"] = write_file_dict

logger.info(f"Context compaction completed: {len(write_file_dict)} tool messages were compacted")

async def async_default_execute(self, e: Exception = None, **_kwargs):
"""Handle execution errors by returning original messages.

This method is called when an exception occurs during async_execute. It preserves
the original messages and marks the operation as unsuccessful.

Args:
e: The exception that occurred during execution, if any.
**_kwargs: Additional keyword arguments (unused but required by interface).
"""
self.context.response.answer = self.context.messages
self.context.response.success = False
self.context.response.metadata["error"] = str(e)
Loading