diff --git a/README.md b/README.md index b94668c9..f2cdd59d 100644 --- a/README.md +++ b/README.md @@ -924,6 +924,7 @@ The Mem0 Memory Tool supports three different backend configurations: | FILE_READ_DIFF_TYPE_DEFAULT | Default diff type for file comparisons | unified | | FILE_READ_USE_GIT_DEFAULT | Default setting for using git in time machine mode | true | | FILE_READ_NUM_REVISIONS_DEFAULT | Default number of revisions to show in time machine mode | 5 | +| FILE_READ_ENCODING_DEFAULT | Default text encoding for file operations | utf-8 | #### Browser Tool diff --git a/src/strands_tools/file_read.py b/src/strands_tools/file_read.py index d57143db..f1648dc8 100644 --- a/src/strands_tools/file_read.py +++ b/src/strands_tools/file_read.py @@ -122,6 +122,9 @@ from strands_tools.utils import console_util from strands_tools.utils.detect_language import detect_language +# Default text encoding for file operations +DEFAULT_TEXT_ENCODING = "utf-8" + # Document format mapping FORMAT_EXTENSIONS = { "pdf": [".pdf"], @@ -352,6 +355,11 @@ def split_path_list(path: str) -> List[str]: "description": "Search recursively in subdirectories (default: true)", "default": True, }, + "encoding": { + "type": "string", + "description": "Text file encoding (default: utf-8)", + "default": DEFAULT_TEXT_ENCODING, + }, }, "required": ["path", "mode"], } @@ -451,7 +459,7 @@ def create_rich_panel(content: str, title: Optional[str] = None, file_path: Opti ) -def get_file_stats(console, file_path: str) -> Dict[str, Any]: +def get_file_stats(console, file_path: str, encoding: str = DEFAULT_TEXT_ENCODING) -> Dict[str, Any]: """ Get file statistics including size, line count, and preview. @@ -460,6 +468,7 @@ def get_file_stats(console, file_path: str) -> Dict[str, Any]: Args: file_path: Path to the file + encoding: Text encoding to use when reading the file Returns: Dict[str, Any]: File statistics including size_bytes, line_count, @@ -472,7 +481,7 @@ def get_file_stats(console, file_path: str) -> Dict[str, Any]: "preview": "", } - with open(file_path, "r") as f: + with open(file_path, "r", encoding=encoding) as f: preview_lines = [] for i, line in enumerate(f): stats["line_count"] += 1 @@ -494,7 +503,13 @@ def get_file_stats(console, file_path: str) -> Dict[str, Any]: return stats -def read_file_lines(console: Console, file_path: str, start_line: int = 0, end_line: Optional[int] = None) -> List[str]: +def read_file_lines( + console: Console, + file_path: str, + start_line: int = 0, + end_line: Optional[int] = None, + encoding: str = DEFAULT_TEXT_ENCODING, +) -> List[str]: """ Read specific lines from file. @@ -505,6 +520,7 @@ def read_file_lines(console: Console, file_path: str, start_line: int = 0, end_l file_path: Path to the file start_line: First line to read (0-based) end_line: Last line to read (optional) + encoding: Text encoding to use when reading the file Returns: List[str]: List of lines read @@ -522,7 +538,7 @@ def read_file_lines(console: Console, file_path: str, start_line: int = 0, end_l raise ValueError(f"Path is not a file: {file_path}") try: - with open(file_path, "r") as f: + with open(file_path, "r", encoding=encoding) as f: all_lines = f.readlines() # Validate line numbers @@ -552,7 +568,9 @@ def read_file_lines(console: Console, file_path: str, start_line: int = 0, end_l raise -def read_file_chunk(console: Console, file_path: str, chunk_size: int, chunk_offset: int = 0) -> str: +def read_file_chunk( + console: Console, file_path: str, chunk_size: int, chunk_offset: int = 0, encoding: str = DEFAULT_TEXT_ENCODING +) -> str: """ Read a chunk of file from given offset. @@ -563,6 +581,7 @@ def read_file_chunk(console: Console, file_path: str, chunk_size: int, chunk_off file_path: Path to the file chunk_size: Number of bytes to read chunk_offset: Starting offset in bytes + encoding: Text encoding to use when reading the file Returns: str: Content read from file @@ -587,7 +606,7 @@ def read_file_chunk(console: Console, file_path: str, chunk_size: int, chunk_off if chunk_size < 0: raise ValueError(f"Invalid chunk_size: {chunk_size}") - with open(file_path, "r") as f: + with open(file_path, "r", encoding=encoding) as f: f.seek(chunk_offset) content = f.read(chunk_size) @@ -630,7 +649,9 @@ def read_file_chunk(console: Console, file_path: str, chunk_size: int, chunk_off raise -def search_file(console: Console, file_path: str, pattern: str, context_lines: int = 2) -> List[Dict[str, Any]]: +def search_file( + console: Console, file_path: str, pattern: str, context_lines: int = 2, encoding: str = DEFAULT_TEXT_ENCODING +) -> List[Dict[str, Any]]: """ Search file for pattern and return matches with context. @@ -641,6 +662,7 @@ def search_file(console: Console, file_path: str, pattern: str, context_lines: i file_path: Path to the file pattern: Text pattern to search for context_lines: Number of lines of context around matches + encoding: Text encoding to use when reading the file Returns: List[Dict[str, Any]]: List of matches with line number and context @@ -662,7 +684,7 @@ def search_file(console: Console, file_path: str, pattern: str, context_lines: i results = [] try: - with open(file_path, "r") as f: + with open(file_path, "r", encoding=encoding) as f: lines = f.readlines() total_matches = 0 @@ -722,7 +744,9 @@ def search_file(console: Console, file_path: str, pattern: str, context_lines: i raise -def create_diff(file_path: str, comparison_path: str, diff_type: str = "unified") -> str: +def create_diff( + file_path: str, comparison_path: str, diff_type: str = "unified", encoding: str = DEFAULT_TEXT_ENCODING +) -> str: """ Create a diff between two files or directories. @@ -733,6 +757,7 @@ def create_diff(file_path: str, comparison_path: str, diff_type: str = "unified" file_path: Path to the first file/directory comparison_path: Path to the second file/directory diff_type: Type of diff view ('unified' is currently supported) + encoding: Text encoding to use when reading files Returns: str: Formatted diff output @@ -749,7 +774,7 @@ def create_diff(file_path: str, comparison_path: str, diff_type: str = "unified" # Function to read file content def read_file(path: str) -> List[str]: - with open(path, "r", encoding="utf-8") as f: + with open(path, "r", encoding=encoding) as f: return f.readlines() # Handle directory comparison @@ -978,6 +1003,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: - path: Path(s) to file(s). For multiple files, use comma-separated list. Can include wildcards like '*.py' or directories. - mode: Reading mode to use (required) + - encoding: Text encoding to use when reading files (default: utf-8) - Additional parameters specific to each mode **kwargs: Additional keyword arguments @@ -1009,6 +1035,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: file_read_diff_type_default = os.getenv("FILE_READ_DIFF_TYPE_DEFAULT", "unified") file_read_use_git_default = os.getenv("FILE_READ_USE_GIT_DEFAULT", "true").lower() == "true" file_read_num_revisions_default = int(os.getenv("FILE_READ_NUM_REVISIONS_DEFAULT", "5")) + file_read_encoding_default = os.getenv("FILE_READ_ENCODING_DEFAULT", DEFAULT_TEXT_ENCODING) try: # Validate required parameters @@ -1022,6 +1049,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: mode = tool_input["mode"] paths = split_path_list(tool_input["path"]) # Handle comma-separated paths recursive = tool_input.get("recursive", file_read_recursive_default) + encoding = tool_input.get("encoding") or file_read_encoding_default # Find all matching files across all paths matching_files = [] @@ -1123,7 +1151,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: try: if mode == "view": try: - with open(file_path, "r") as f: + with open(file_path, "r", encoding=encoding) as f: content = f.read() # Create rich panel with syntax highlighting @@ -1140,8 +1168,8 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: response_content.append({"text": error_msg}) elif mode == "preview": - stats = get_file_stats(console, file_path) - with open(file_path, "r") as f: + stats = get_file_stats(console, file_path, encoding) + with open(file_path, "r", encoding=encoding) as f: content = "".join(f.readlines()[:50]) preview_panel = create_rich_panel( @@ -1163,8 +1191,8 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: ) elif mode == "stats": - stats = get_file_stats(console, file_path) - response_content.append({"text": json.dumps(stats, indent=2)}) + stats = get_file_stats(console, file_path, encoding) + response_content.append({"text": json.dumps(stats, indent=2, ensure_ascii=False)}) elif mode == "lines": lines = read_file_lines( @@ -1172,6 +1200,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: file_path, tool_input.get("start_line", file_read_start_line_default), tool_input.get("end_line"), + encoding, ) response_content.append({"text": "".join(lines)}) @@ -1181,6 +1210,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: file_path, tool_input.get("chunk_size", 1024), tool_input.get("chunk_offset", file_read_chunk_offset_default), + encoding, ) response_content.append({"text": content}) @@ -1190,6 +1220,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: file_path, tool_input.get("search_pattern", ""), tool_input.get("context_lines", file_read_context_lines_default), + encoding, ) response_content.extend([{"text": r["context"]} for r in results]) @@ -1202,6 +1233,7 @@ def file_read(tool: ToolUse, **kwargs: Any) -> ToolResult: file_path, os.path.expanduser(comparison_path), tool_input.get("diff_type", file_read_diff_type_default), + encoding, ) diff_panel = create_rich_panel( diff --git a/tests/test_file_read.py b/tests/test_file_read.py index 7dc6c73d..0c5a9a73 100644 --- a/tests/test_file_read.py +++ b/tests/test_file_read.py @@ -323,3 +323,51 @@ def test_file_read_error_message_brackets(): result = file_read.file_read(tool=tool_use) assert result["status"] == "error" + + +@pytest.mark.parametrize( + "mode", + ["view", "preview", "stats", "lines", "chunk", "search", "diff"], +) +@pytest.mark.parametrize( + "tool_encoding,file_encoding,content", + [ + (None, "utf-8", "Hello 世界"), + ("utf-8", "utf-8", "Hello 世界"), + ("cp932", "cp932", "Hello 世界"), + ("latin-1", "latin-1", "Hello world"), + ], +) +def test_file_read_encoding(tmp_path, mode, tool_encoding, file_encoding, content): + """Test file read with text encoding option.""" + # Create test file with specified encoding + file1_path = tmp_path / "test1.txt" + file1_path.write_text(content, encoding=file_encoding) + + tool_input = { + "path": str(file1_path), + "mode": mode, + "encoding": tool_encoding, + } + expected_content = content + + if mode == "search": + expected_content = content.split()[-1] # Use the last word from content + tool_input["search_pattern"] = expected_content + + if mode == "diff": + # Create a second file for diffing + file2_path = tmp_path / "test2.md" + file2_path.write_text("Different content", encoding=file_encoding) + tool_input["comparison_path"] = str(file2_path) + + tool_use = { + "toolUseId": "test-tool-use-id", + "input": tool_input, + } + + result = file_read.file_read(tool=tool_use) + result_text = extract_result_text(result) + + assert result["status"] == "success" + assert expected_content in result_text