Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions codemcp/async_file_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,73 @@
#!/usr/bin/env python3

import os
from typing import List
from typing import List, Literal

import anyio

from .line_endings import detect_line_endings

# Define OpenTextMode and OpenBinaryMode similar to what anyio uses
OpenTextMode = Literal[
"r",
"r+",
"+r",
"rt",
"rt+",
"r+t",
"+rt",
"tr",
"tr+",
"t+r",
"w",
"w+",
"+w",
"wt",
"wt+",
"w+t",
"+wt",
"tw",
"tw+",
"t+w",
"a",
"a+",
"+a",
"at",
"at+",
"a+t",
"+at",
"ta",
"ta+",
"t+a",
]
OpenBinaryMode = Literal[
"rb",
"rb+",
"r+b",
"+rb",
"br",
"br+",
"b+r",
"wb",
"wb+",
"w+b",
"+wb",
"bw",
"bw+",
"b+w",
"ab",
"ab+",
"a+b",
"+ab",
"ba",
"ba+",
"b+a",
]


async def async_open_text(
file_path: str,
mode: str = "r",
mode: OpenTextMode = "r",
encoding: str = "utf-8",
errors: str = "replace",
) -> str:
Expand All @@ -31,7 +88,7 @@ async def async_open_text(
return await f.read()


async def async_open_binary(file_path: str, mode: str = "rb") -> bytes:
async def async_open_binary(file_path: str, mode: OpenBinaryMode = "rb") -> bytes:
"""Asynchronously open and read a binary file.

Args:
Expand Down Expand Up @@ -67,7 +124,7 @@ async def async_readlines(
async def async_write_text(
file_path: str,
content: str,
mode: str = "w",
mode: OpenTextMode = "w",
encoding: str = "utf-8",
) -> None:
"""Asynchronously write text to a file.
Expand All @@ -84,15 +141,17 @@ async def async_write_text(
await f.write(content)


async def async_write_binary(file_path: str, content: bytes, mode: str = "wb") -> None:
async def async_write_binary(
file_path: str, content: bytes, mode: OpenBinaryMode = "wb"
) -> None:
"""Asynchronously write binary data to a file.

Args:
file_path: The path to the file
content: The binary content to write
mode: The file open mode (default: 'wb')
"""
async with await anyio.open_file(file_path, mode, newline="") as f:
async with await anyio.open_file(file_path, mode) as f:
await f.write(content)


Expand Down
10 changes: 6 additions & 4 deletions codemcp/code_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import subprocess
from typing import List, Optional
from typing import List, Optional, Union

import tomli

Expand Down Expand Up @@ -92,7 +92,7 @@ async def run_code_command(
command_name: str,
command: List[str],
commit_message: str,
chat_id: str = None,
chat_id: Optional[str] = None,
) -> str:
"""Run a code command (lint, format, etc.) and handle git operations.

Expand Down Expand Up @@ -131,10 +131,11 @@ async def run_code_command(
# If it's a git repo, commit any pending changes before running the command
if is_git_repo:
logging.info(f"Committing any pending changes before {command_name}")
chat_id_str = str(chat_id) if chat_id is not None else ""
commit_result = await commit_changes(
full_dir_path,
f"Snapshot before auto-{command_name}",
chat_id,
chat_id_str,
commit_all=True,
)
if not commit_result[0]:
Expand All @@ -160,8 +161,9 @@ async def run_code_command(
has_changes = await check_for_changes(full_dir_path)
if has_changes:
logging.info(f"Changes detected after {command_name}, committing")
chat_id_str = str(chat_id) if chat_id is not None else ""
success, commit_result_message = await commit_changes(
full_dir_path, commit_message, chat_id, commit_all=True
full_dir_path, commit_message, chat_id_str, commit_all=True
)

if success:
Expand Down
16 changes: 12 additions & 4 deletions codemcp/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import os
from typing import List, Union

# Constants
MAX_LINES_TO_READ = 1000
Expand Down Expand Up @@ -78,15 +79,15 @@ def get_edit_snippet(
snippet_lines = edited_lines[start_line:end_line]

# Format with line numbers
result = []
result: List[str] = []
for i, line in enumerate(snippet_lines):
line_num = start_line + i + 1
result.append(f"{line_num:4d} | {line}")

return "\n".join(result)


def truncate_output_content(content: str, prefer_end: bool = True) -> str:
def truncate_output_content(content: Union[str, bytes], prefer_end: bool = True) -> str:
"""Truncate command output content to a reasonable size.

When prefer_end is True, this function prioritizes keeping content from the end
Expand All @@ -101,15 +102,22 @@ def truncate_output_content(content: str, prefer_end: bool = True) -> str:
The truncated content with appropriate indicators
"""
if not content:
return content
return "" if content is None else str(content)

# Convert bytes to str if needed
if isinstance(content, bytes):
try:
content = content.decode("utf-8")
except UnicodeDecodeError:
return "[Binary content cannot be displayed]"

lines = content.splitlines()
total_lines = len(lines)

# If number of lines is within the limit, check individual line lengths
if total_lines <= MAX_LINES_TO_READ:
# Process line lengths
processed_lines = []
processed_lines: List[str] = []
for line in lines:
if len(line) > MAX_LINE_LENGTH:
processed_lines.append(line[:MAX_LINE_LENGTH] + "... (line truncated)")
Expand Down
11 changes: 7 additions & 4 deletions codemcp/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import logging
import os
from typing import Optional, Tuple

import anyio

from .access import check_edit_permission
from .git import commit_changes
from .line_endings import apply_line_endings, normalize_to_lf
from .async_file_utils import OpenTextMode

__all__ = [
"check_file_path_and_permissions",
Expand All @@ -18,7 +20,7 @@
]


async def check_file_path_and_permissions(file_path: str) -> tuple[bool, str | None]:
async def check_file_path_and_permissions(file_path: str) -> Tuple[bool, Optional[str]]:
"""Check if the file path is valid and has the necessary permissions.

Args:
Expand Down Expand Up @@ -110,7 +112,7 @@ def ensure_directory_exists(file_path: str) -> None:

async def async_open_text(
file_path: str,
mode: str = "r",
mode: OpenTextMode = "r",
encoding: str = "utf-8",
errors: str = "replace",
) -> str:
Expand All @@ -135,7 +137,7 @@ async def write_text_content(
file_path: str,
content: str,
encoding: str = "utf-8",
line_endings: str | None = None,
line_endings: Optional[str] = None,
) -> None:
"""Write text content to a file with specified encoding and line endings.

Expand All @@ -156,7 +158,8 @@ async def write_text_content(
ensure_directory_exists(file_path)

# Write the content using anyio
write_mode: OpenTextMode = "w"
async with await anyio.open_file(
file_path, "w", encoding=encoding, newline=""
file_path, write_mode, encoding=encoding, newline=""
) as f:
await f.write(final_content)
14 changes: 7 additions & 7 deletions codemcp/git_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def create_commit_reference(
text=True,
check=True,
)
tree_hash = tree_result.stdout.strip()
tree_hash = str(tree_result.stdout.strip())
else:
# Create an empty tree if no HEAD exists
empty_tree_result = await run_command(
Expand All @@ -102,7 +102,7 @@ async def create_commit_reference(
text=True,
check=True,
)
tree_hash = empty_tree_result.stdout.strip()
tree_hash = str(empty_tree_result.stdout.strip())

commit_message = commit_msg

Expand All @@ -116,7 +116,7 @@ async def create_commit_reference(
text=True,
check=True,
)
head_hash = head_hash_result.stdout.strip()
head_hash = str(head_hash_result.stdout.strip())
parent_arg = ["-p", head_hash]

# Create the commit object (with GPG signing explicitly disabled)
Expand All @@ -135,7 +135,7 @@ async def create_commit_reference(
text=True,
check=True,
)
commit_hash = commit_result.stdout.strip()
commit_hash = str(commit_result.stdout.strip())

ref_name = f"refs/codemcp/{chat_id}"

Expand Down Expand Up @@ -309,7 +309,7 @@ async def commit_changes(
text=True,
check=True,
)
tree_hash = tree_result.stdout.strip()
tree_hash = str(tree_result.stdout.strip())

# Get the commit message from the reference
ref_message_result = await run_command(
Expand All @@ -319,7 +319,7 @@ async def commit_changes(
text=True,
check=True,
)
ref_message = ref_message_result.stdout.strip()
ref_message = str(ref_message_result.stdout.strip())

# Create a new commit with the same tree as HEAD but message from the reference
# This effectively creates the commit without changing the working tree
Expand All @@ -339,7 +339,7 @@ async def commit_changes(
text=True,
check=True,
)
new_commit_hash = new_commit_result.stdout.strip()
new_commit_hash = str(new_commit_result.stdout.strip())

# Update HEAD to point to the new commit
await run_command(
Expand Down
8 changes: 4 additions & 4 deletions codemcp/git_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def get_head_commit_message(directory: str) -> str:
text=True,
)

return result.stdout.strip()
return str(result.stdout.strip())


async def get_head_commit_hash(directory: str, short: bool = True) -> str:
Expand Down Expand Up @@ -73,7 +73,7 @@ async def get_head_commit_hash(directory: str, short: bool = True) -> str:
text=True,
)

return result.stdout.strip()
return str(result.stdout.strip())


async def get_head_commit_chat_id(directory: str) -> str | None:
Expand Down Expand Up @@ -150,7 +150,7 @@ async def get_repository_root(path: str) -> str:
text=True,
)

return result.stdout.strip()
return str(result.stdout.strip())


async def is_git_repository(path: str) -> bool:
Expand Down Expand Up @@ -207,7 +207,7 @@ async def get_ref_commit_chat_id(directory: str, ref_name: str) -> str | None:
capture_output=True,
text=True,
)
commit_message = message_result.stdout.strip()
commit_message = str(message_result.stdout.strip())

# Use regex to find the last occurrence of codemcp-id: XXX
# The pattern looks for "codemcp-id: " followed by any characters up to a newline or end of string
Expand Down
Loading
Loading