diff --git a/backend/app/mcp_server/server.py b/backend/app/mcp_server/server.py index d5f1fd3a1..2fe870686 100644 --- a/backend/app/mcp_server/server.py +++ b/backend/app/mcp_server/server.py @@ -207,6 +207,7 @@ def _register_knowledge_tools() -> None: # The decorators will add tools to the global registry from app.mcp_server.tool_registry import register_tools_to_server from app.mcp_server.tools import ( # noqa: F401 side-effect: triggers @mcp_tool registration + dingtalk_docs, knowledge, ) diff --git a/backend/app/mcp_server/tools/dingtalk_docs.py b/backend/app/mcp_server/tools/dingtalk_docs.py new file mode 100644 index 000000000..8edb8bf4b --- /dev/null +++ b/backend/app/mcp_server/tools/dingtalk_docs.py @@ -0,0 +1,416 @@ +# SPDX-FileCopyrightText: 2025 WeCode, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +DingTalk Document MCP tools for knowledge base integration. + +This module provides MCP tool implementations for adding DingTalk documents +to Wegent knowledge bases. The tools coordinate with sandbox execution to: +1. Download DingTalk document content +2. Save with name {title}.{file_extension} +3. Upload as attachment +4. Create knowledge base document + +These tools are registered with the MCP server and exposed to AI agents. +""" + +import logging +from typing import Any, Dict, Optional + +from app.db.session import SessionLocal +from app.mcp_server.auth import TaskTokenInfo +from app.mcp_server.tools.decorator import build_mcp_tools_dict, mcp_tool +from app.mcp_server.tools.knowledge import _get_user_from_token +from app.services.dingtalk.docs_service import dingtalk_docs_service +from app.services.knowledge.orchestrator import knowledge_orchestrator +from shared.telemetry.decorators import trace_async, trace_sync + +logger = logging.getLogger(__name__) + + +@mcp_tool( + name="get_dingtalk_document_info", + description="Get information about a DingTalk document from its URL.", + server="knowledge", + param_descriptions={ + "doc_url": "DingTalk document URL (e.g., https://alidocs.dingtalk.com/i/nodes/xxx)", + }, +) +@trace_async( + span_name="mcp.get_dingtalk_document_info", + tracer_name="mcp", + extract_attributes=lambda token_info, doc_url: { + "doc_url": doc_url, + }, +) +async def get_dingtalk_document_info( + token_info: TaskTokenInfo, + doc_url: str, +) -> Dict[str, Any]: + """ + Get DingTalk document information including title and modification time. + + Args: + token_info: Task token information containing user context + doc_url: DingTalk document URL + + Returns: + Dict with document info: + - doc_id: Document ID + - title: Document title + - modified_time: ISO format modification time + - content_type: Content type + - url: Original URL + """ + try: + # Get user preferences for MCP config + db = SessionLocal() + try: + user = _get_user_from_token(db, token_info) + user_preferences = user.preferences if user else None + finally: + db.close() + + doc_info = await dingtalk_docs_service.get_document_info( + doc_url, user_preferences=user_preferences + ) + return { + "success": True, + "doc_id": doc_info["doc_id"], + "title": doc_info["title"], + "modified_time": doc_info["modified_time"], + "content_type": doc_info["content_type"], + "url": doc_info["url"], + } + except ValueError as e: + logger.warning(f"[MCP] get_dingtalk_document_info validation error: {e}") + return {"success": False, "error": str(e)} + except Exception as e: + logger.error(f"[MCP] get_dingtalk_document_info error: {e}", exc_info=True) + return {"success": False, "error": f"Failed to get document info: {e}"} + + +@mcp_tool( + name="add_dingtalk_doc_to_knowledge", + description="Add a DingTalk document to Wegent knowledge base. Downloads the document, saves it with naming {title}.{file_extension}, and creates a knowledge base document.", + server="knowledge", + param_descriptions={ + "knowledge_base_id": "Target knowledge base ID", + "doc_url": "DingTalk document URL", + "doc_title": "Document title (optional, will be fetched from DingTalk if not provided)", + "doc_content": "Document content (optional, will be downloaded from DingTalk if not provided)", + "modified_time": "Document modification time in YYYYMMDDHHMMSS format (optional)", + "trigger_indexing": "Whether to trigger RAG indexing (default: True)", + "trigger_summary": "Whether to trigger summary generation (default: True)", + }, +) +@trace_async( + span_name="mcp.add_dingtalk_doc_to_knowledge", + tracer_name="mcp", + extract_attributes=lambda token_info, knowledge_base_id, doc_url, **kwargs: { + "knowledge_base_id": knowledge_base_id, + "doc_url": doc_url, + "doc_title": kwargs.get("doc_title"), + }, +) +async def add_dingtalk_doc_to_knowledge( + token_info: TaskTokenInfo, + knowledge_base_id: int, + doc_url: str, + doc_title: Optional[str] = None, + doc_content: Optional[str] = None, + modified_time: Optional[str] = None, + trigger_indexing: bool = True, + trigger_summary: bool = True, +) -> Dict[str, Any]: + """ + Add a DingTalk document to Wegent knowledge base. + + This tool creates a knowledge base document from a DingTalk document. + The document can be provided directly via parameters or fetched from DingTalk. + + File name: {title}.{file_extension} + + Args: + token_info: Task token information containing user context + knowledge_base_id: Target knowledge base ID + doc_url: DingTalk document URL (for reference and metadata) + doc_title: Document title (optional) + doc_content: Document content (optional, markdown format preferred) + modified_time: Modification time in YYYYMMDDHHMMSS format (optional) + trigger_indexing: Whether to trigger RAG indexing + trigger_summary: Whether to trigger summary generation + + Returns: + Dict with operation result: + - success: Whether the operation succeeded + - document_id: Created document ID + - document_name: Document name + - message: Status message + """ + db = SessionLocal() + try: + user = _get_user_from_token(db, token_info) + if not user: + return {"success": False, "error": "User not found"} + + # Variables to store document metadata from DingTalk + file_extension = "" + update_time = None + + # If content not provided, fetch from DingTalk + if not doc_content: + logger.info( + f"[MCP] Content not provided, fetching from DingTalk: {doc_url}" + ) + try: + # Get user preferences for MCP config + user_preferences = user.preferences if user else None + doc_download = await dingtalk_docs_service.download_document_content( + doc_url, user_preferences=user_preferences + ) + doc_content = doc_download.get("content", "") + # Use fetched title if not provided + if not doc_title: + doc_title = doc_download.get("title", "DingTalk Document") + # Use fetched modified_time if not provided + if not modified_time: + modified_time = doc_download.get("modified_time_formatted") + # Get file_extension from DingTalk response + file_extension = doc_download.get("file_extension", "") + # Get original updateTime for source_config + update_time = doc_download.get("modified_time") + except Exception as e: + logger.error(f"[MCP] Failed to fetch document from DingTalk: {e}") + return { + "success": False, + "error": f"Failed to fetch document from DingTalk: {e}", + } + + if not doc_content: + return { + "success": False, + "error": "Failed to get document content from DingTalk", + } + + # Use provided title or extract from URL + title = doc_title or "DingTalk Document" + + # Build filename: {title}.{file_extension} + from app.services.knowledge.orchestrator import _build_filename + + filename = _build_filename(title, file_extension) + + logger.info( + f"[MCP] Adding DingTalk doc to KB {knowledge_base_id}: " + f"title='{title}', filename='{filename}'" + ) + + # Build source_config with DingTalk document metadata + source_config = { + "url": doc_url, + "source": "dingtalk-connector", + "updated_at": update_time or modified_time, + } + + # Create document with text content + # The content is expected to be markdown from DingTalk + # Note: create_document_with_content is a sync function, call directly + # instead of using asyncio.to_thread to avoid passing Session/User across + # thread boundaries (SQLAlchemy objects are not thread-safe) + result = knowledge_orchestrator.create_document_with_content( + db=db, + user=user, + knowledge_base_id=knowledge_base_id, + name=filename, + source_type="text", + content=doc_content, + file_extension=file_extension, + trigger_indexing=trigger_indexing, + trigger_summary=trigger_summary, + source_config=source_config, + ) + + return { + "success": True, + "document_id": result.id, + "document_name": result.name, + "filename": filename, + "message": f"Document '{title}' added to knowledge base successfully", + } + + except ValueError as e: + logger.warning(f"[MCP] add_dingtalk_doc_to_knowledge validation error: {e}") + return {"success": False, "error": str(e)} + except Exception as e: + logger.error(f"[MCP] add_dingtalk_doc_to_knowledge error: {e}", exc_info=True) + return {"success": False, "error": f"Failed to add document: {e}"} + finally: + db.close() + + +@mcp_tool( + name="download_dingtalk_document", + description="Download a DingTalk document's content from its URL. Returns the document content, title, and metadata without creating a knowledge base document.", + server="knowledge", + param_descriptions={ + "doc_url": "DingTalk document URL (e.g., https://alidocs.dingtalk.com/i/nodes/xxx)", + }, +) +@trace_async( + span_name="mcp.download_dingtalk_document", + tracer_name="mcp", + extract_attributes=lambda token_info, doc_url: { + "doc_url": doc_url, + }, +) +async def download_dingtalk_document( + token_info: TaskTokenInfo, + doc_url: str, +) -> Dict[str, Any]: + """ + Download a DingTalk document's content from its URL. + + This tool downloads the document content from DingTalk and returns it + without creating a knowledge base document. Useful for skills that need + to process the content before adding it to a knowledge base. + + Args: + token_info: Task token information containing user context + doc_url: DingTalk document URL + + Returns: + Dict with document content and metadata: + - success: Whether the operation succeeded + - doc_id: Document ID + - title: Document title + - content: Document content (markdown format) + - file_extension: File extension (e.g., "md") + - modified_time: ISO format modification time + - error: Error message if failed + """ + try: + # Get user preferences for MCP config + db = SessionLocal() + try: + user = _get_user_from_token(db, token_info) + user_preferences = user.preferences if user else None + finally: + db.close() + + # Download document content + doc_download = await dingtalk_docs_service.download_document_content( + doc_url, user_preferences=user_preferences + ) + + return { + "success": True, + "doc_id": doc_download.get("doc_id"), + "title": doc_download.get("title", "DingTalk Document"), + "content": doc_download.get("content", ""), + "file_extension": doc_download.get("file_extension", ""), + "modified_time": doc_download.get("modified_time"), + } + except ValueError as e: + logger.warning(f"[MCP] download_dingtalk_document validation error: {e}") + return {"success": False, "error": str(e)} + except Exception as e: + logger.error(f"[MCP] download_dingtalk_document error: {e}", exc_info=True) + return {"success": False, "error": f"Failed to download document: {e}"} + + +@mcp_tool( + name="add_dingtalk_doc_with_attachment", + description="Add a DingTalk document to knowledge base using an existing attachment. This is used after the skill uploads the document as an attachment.", + server="knowledge", + param_descriptions={ + "knowledge_base_id": "Target knowledge base ID", + "doc_title": "Document title", + "attachment_id": "Existing attachment ID from upload_attachment tool", + "trigger_indexing": "Whether to trigger RAG indexing (default: True)", + "trigger_summary": "Whether to trigger summary generation (default: True)", + }, +) +@trace_sync( + span_name="mcp.add_dingtalk_doc_with_attachment", + tracer_name="mcp", + extract_attributes=lambda token_info, knowledge_base_id, doc_title, attachment_id, **kwargs: { + "knowledge_base_id": knowledge_base_id, + "doc_title": doc_title, + "attachment_id": attachment_id, + }, +) +def add_dingtalk_doc_with_attachment( + token_info: TaskTokenInfo, + knowledge_base_id: int, + doc_title: str, + attachment_id: int, + trigger_indexing: bool = True, + trigger_summary: bool = True, +) -> Dict[str, Any]: + """ + Add a DingTalk document to knowledge base using an existing attachment. + + This tool is designed to work with the dingtalk-connector skill which: + 1. Downloads the DingTalk document in sandbox + 2. Saves it as {title}.{file_extension} + 3. Uploads it as an attachment + 4. Calls this tool to create the knowledge base document + + Args: + token_info: Task token information containing user context + knowledge_base_id: Target knowledge base ID + doc_title: Document title + attachment_id: Attachment ID from upload_attachment tool + trigger_indexing: Whether to trigger RAG indexing + trigger_summary: Whether to trigger summary generation + + Returns: + Dict with operation result + """ + db = SessionLocal() + try: + user = _get_user_from_token(db, token_info) + if not user: + return {"success": False, "error": "User not found"} + + logger.info( + f"[MCP] Adding DingTalk doc with attachment to KB {knowledge_base_id}: " + f"title='{doc_title}', attachment_id={attachment_id}" + ) + + # Create document with attachment reference + result = knowledge_orchestrator.create_document_with_content( + db=db, + user=user, + knowledge_base_id=knowledge_base_id, + name=doc_title, + source_type="attachment", + attachment_id=attachment_id, + trigger_indexing=trigger_indexing, + trigger_summary=trigger_summary, + ) + + return { + "success": True, + "document_id": result.id, + "document_name": result.name, + "attachment_id": attachment_id, + "message": f"Document '{doc_title}' added to knowledge base successfully", + } + + except ValueError as e: + logger.warning(f"[MCP] add_dingtalk_doc_with_attachment validation error: {e}") + return {"success": False, "error": str(e)} + except Exception as e: + logger.error( + f"[MCP] add_dingtalk_doc_with_attachment error: {e}", exc_info=True + ) + return {"success": False, "error": f"Failed to add document: {e}"} + finally: + db.close() + + +# Build tool registry from decorated functions +DINGTALK_DOCS_MCP_TOOLS = build_mcp_tools_dict(server="knowledge") diff --git a/backend/app/services/dingtalk/__init__.py b/backend/app/services/dingtalk/__init__.py new file mode 100644 index 000000000..c94f473c6 --- /dev/null +++ b/backend/app/services/dingtalk/__init__.py @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: 2025 WeCode, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +"""DingTalk services package.""" + +from app.services.dingtalk.docs_service import ( + DingTalkDocsService, + dingtalk_docs_service, +) + +__all__ = [ + "DingTalkDocsService", + "dingtalk_docs_service", +] diff --git a/backend/app/services/dingtalk/docs_service.py b/backend/app/services/dingtalk/docs_service.py new file mode 100644 index 000000000..6d79b951b --- /dev/null +++ b/backend/app/services/dingtalk/docs_service.py @@ -0,0 +1,440 @@ +# SPDX-FileCopyrightText: 2025 WeCode, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +"""DingTalk Document Service. + +This module provides services for interacting with DingTalk documents via MCP, +including fetching document metadata, downloading document content, +and managing document operations. +""" + +import json +import logging +import re +from datetime import datetime +from typing import Any, Dict, Optional +from urllib.parse import urlparse + +import httpx + +from app.services.user_mcp_service import UserMCPService + +logger = logging.getLogger(__name__) + +# Constants +HTTP_TIMEOUT_SECONDS = 60.0 +AUTH_ERROR_CODES = (401, 403) +DOC_ID_PREVIEW_LENGTH = 8 + + +class DingTalkDocsService: + """Service for DingTalk document operations via MCP. + + Provides methods to: + - Parse DingTalk document URLs + - Fetch document metadata via dingtalk_docs MCP + - Download document content via dingtalk_docs MCP + """ + + def __init__(self) -> None: + """Initialize the DingTalk docs service.""" + pass + + def _extract_doc_id_from_url(self, url: str) -> Optional[str]: + """Extract document ID from DingTalk document URL. + + Supports various DingTalk document URL formats: + - https://alidocs.dingtalk.com/i/nodes/{doc_id} + - https://alidocs.dingtalk.com/i/team/{team_id}/docs/{doc_id} + - https://alidocs.dingtalk.com/i/team/{team_id}/wiki/{wiki_id} + + Args: + url: DingTalk document URL + + Returns: + Document ID if found, None otherwise + """ + if not url: + return None + + try: + parsed = urlparse(url) + except ValueError: + return None + + # Validate hostname is alidocs.dingtalk.com + if parsed.hostname != "alidocs.dingtalk.com": + return None + + path = parsed.path or "" + + # Pattern for /i/nodes/{doc_id} + node_pattern = r"^/i/nodes/([a-zA-Z0-9_-]+)" + match = re.search(node_pattern, path) + if match: + return match.group(1) + + # Pattern for /i/team/{team_id}/docs/{doc_id} + docs_pattern = r"^/i/team/[^/]+/docs/([a-zA-Z0-9_-]+)" + match = re.search(docs_pattern, path) + if match: + return match.group(1) + + # Pattern for /i/team/{team_id}/wiki/{wiki_id} + wiki_pattern = r"^/i/team/[^/]+/wiki/([a-zA-Z0-9_-]+)" + match = re.search(wiki_pattern, path) + if match: + return match.group(1) + + return None + + def _get_dingtalk_docs_mcp_config( + self, user_preferences: Optional[str] + ) -> Optional[Dict[str, Any]]: + """Get dingtalk_docs MCP server config for user. + + Args: + user_preferences: User's preferences JSON string + + Returns: + MCP server config dict with 'name', 'url', 'type' or None if not configured + """ + return UserMCPService.get_enabled_mcp_server( + user_preferences, + provider_id="dingtalk", + service_id="docs", + server_name="dingtalk_docs", + ) + + async def _call_dingtalk_mcp_tool( + self, + mcp_config: Dict[str, Any], + tool_name: str, + arguments: Dict[str, Any], + ) -> Dict[str, Any]: + """Call a tool on the dingtalk_docs MCP server. + + Args: + mcp_config: MCP server config with 'url' + tool_name: Name of the tool to call + arguments: Tool arguments + + Returns: + Tool result dict + + Raises: + ValueError: If MCP call fails + """ + url = mcp_config.get("url", "").rstrip("/") + if not url: + raise ValueError("dingtalk_docs MCP URL not configured") + + # Build MCP tool call payload + payload = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": tool_name, + "arguments": arguments, + }, + "id": 1, + } + + logger.info(f"Calling dingtalk_docs MCP tool: {tool_name}") + + try: + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT_SECONDS) as client: + response = await client.post( + url, + json=payload, + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + }, + ) + response.raise_for_status() + result = response.json() + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error from DingTalk MCP: {e.response.status_code}") + logger.error(f"Response body: {e.response.text}") + if e.response.status_code in AUTH_ERROR_CODES: + raise ValueError( + "DingTalk authentication failed. Please check your MCP configuration." + ) + if e.response.status_code == 406: + raise ValueError( + f"DingTalk MCP request format not acceptable (HTTP 406). " + f"Response: {e.response.text}" + ) + raise ValueError( + f"Failed to call DingTalk MCP: HTTP {e.response.status_code}" + ) + + if "error" in result: + error_msg = result["error"].get("message", "Unknown MCP error") + # Check for authentication-related errors + if error_msg and any( + keyword in error_msg.lower() + for keyword in [ + "auth", + "authentication", + "unauthorized", + "permission", + "access", + ] + ): + raise ValueError( + f"DingTalk authentication required: {error_msg}\n" + f"Please ensure:\n" + f"1. You have access to this document in DingTalk\n" + f"2. The document is shared with you or is public\n" + f"3. Your DingTalk MCP configuration is correct" + ) + raise ValueError(f"MCP tool call failed: {error_msg}") + + # Extract tool result from content + content = result.get("result", {}).get("content", []) + if content and len(content) > 0: + text_content = content[0].get("text", "{}") + try: + return json.loads(text_content) + except json.JSONDecodeError: + return {"content": text_content} + + return {} + + async def get_document_info( + self, + doc_url: str, + user_preferences: Optional[str] = None, + ) -> Dict[str, Any]: + """Get DingTalk document information via MCP. + + Args: + doc_url: DingTalk document URL + user_preferences: User's preferences JSON string (for MCP config) + + Returns: + Dict containing document info: + - doc_id: Document ID + - title: Document title + - modified_time: ISO format modification time + - content_type: Content type + + Raises: + ValueError: If URL is invalid or document not found + """ + doc_id = self._extract_doc_id_from_url(doc_url) + if not doc_id: + raise ValueError(f"Invalid DingTalk document URL: {doc_url}") + + logger.info(f"Fetching document info for doc_id: {doc_id}") + + # Get MCP config + mcp_config = self._get_dingtalk_docs_mcp_config(user_preferences) + if not mcp_config: + raise ValueError( + "dingtalk_docs MCP not configured. " + "Please configure DingTalk Docs MCP in user settings." + ) + + # Call MCP tool to get document info + # Note: DingTalk MCP get_document_info only requires nodeId parameter + try: + result = await self._call_dingtalk_mcp_tool( + mcp_config, + tool_name="get_document_info", + arguments={"nodeId": doc_id}, + ) + + # Map MCP result to our format + # DingTalk MCP returns camelCase field names + # Note: 'name' is the document title, 'updateTime' is the modification time + title = result.get("name") + modified_time = result.get("updateTime") + if not title or not modified_time: + logger.error(f"MCP response fields: {list(result.keys())}") + raise ValueError("MCP response missing required document metadata") + + return { + "doc_id": doc_id, + "title": title, + "modified_time": modified_time, + "content_type": result.get("contentType", "markdown"), + "url": result.get("docUrl", doc_url), + } + + except Exception as e: + logger.error(f"Failed to get document info from MCP: {e}") + raise ValueError(f"Failed to get document info: {e}") from e + + async def download_document_content( + self, + doc_url: str, + user_preferences: Optional[str] = None, + export_format: str = "markdown", + ) -> Dict[str, Any]: + """Download DingTalk document content via MCP. + + According to DingTalk MCP documentation: + 1. First call get_document_info to get metadata (contentType, extension) + 2. Then choose the appropriate tool based on contentType and extension: + - contentType=ALIDOC, extension=adoc → get_document_content(nodeId) + - contentType=ALIDOC, extension=axls → dingtalk_table MCP + - contentType=ALIDOC, extension=able → dingtalk_ai_table MCP + - contentType≠ALIDOC and nodeType=file → download_file(nodeId) + + Args: + doc_url: DingTalk document URL + user_preferences: User's preferences JSON string (for MCP config) + export_format: Export format (markdown, html, txt) + + Returns: + Dict containing: + - content: Document content as string + - title: Document title + - modified_time: Modification time + - file_extension: Suggested file extension + + Raises: + ValueError: If download fails + """ + doc_id = self._extract_doc_id_from_url(doc_url) + if not doc_id: + raise ValueError(f"Invalid DingTalk document URL: {doc_url}") + + logger.info(f"Downloading document content for doc_id: {doc_id}") + + # Get MCP config + mcp_config = self._get_dingtalk_docs_mcp_config(user_preferences) + if not mcp_config: + raise ValueError( + "dingtalk_docs MCP not configured. " + "Please configure DingTalk Docs MCP in user settings." + ) + + try: + # Step 1: Get document info to determine content type + doc_info = await self._call_dingtalk_mcp_tool( + mcp_config, + tool_name="get_document_info", + arguments={"nodeId": doc_id}, + ) + + # DingTalk MCP returns 'name' as the document title + title = doc_info.get( + "name", f"DingTalkDoc_{doc_id[:DOC_ID_PREVIEW_LENGTH]}" + ) + content_type = doc_info.get("contentType", "") + extension = doc_info.get("extension", "") + node_type = doc_info.get("nodeType", "") + # DingTalk MCP returns 'updateTime' as the modification time (Unix timestamp in ms) + modified_time = doc_info.get("updateTime", datetime.now().isoformat()) + + logger.info( + f"Document info: contentType={content_type}, extension={extension}, nodeType={node_type}" + ) + + # Step 2: Choose appropriate tool based on contentType and extension + content = "" + + if content_type == "ALIDOC" and extension == "adoc": + # DingTalk online document - use get_document_content + result = await self._call_dingtalk_mcp_tool( + mcp_config, + tool_name="get_document_content", + arguments={"nodeId": doc_id}, + ) + logger.info(f"get_document_content result keys: {list(result.keys())}") + content = result.get("content", "") + if not content: + # Try other possible field names + content = result.get("markdown", "") or result.get("text", "") + + elif content_type == "ALIDOC" and extension == "axls": + # DingTalk spreadsheet - requires dingtalk_table MCP + raise ValueError( + "DingTalk spreadsheet documents are not supported yet. " + "Please use dingtalk_table MCP for spreadsheets." + ) + + elif content_type == "ALIDOC" and extension == "able": + # DingTalk AI table - requires dingtalk_ai_table MCP + raise ValueError( + "DingTalk AI table documents are not supported yet. " + "Please use dingtalk_ai_table MCP for AI tables." + ) + + elif content_type != "ALIDOC" and node_type == "file": + # Regular file - use download_file + result = await self._call_dingtalk_mcp_tool( + mcp_config, + tool_name="download_file", + arguments={"nodeId": doc_id}, + ) + # download_file returns a download link + download_url = result.get("downloadUrl", "") + if download_url: + # Fetch the file content from the download URL + async with httpx.AsyncClient( + timeout=HTTP_TIMEOUT_SECONDS + ) as client: + file_response = await client.get(download_url) + file_response.raise_for_status() + content = file_response.text + else: + raise ValueError("No download URL returned from DingTalk") + + else: + # Try get_document_content as fallback + logger.warning( + f"Unknown content type: {content_type}/{extension}, trying get_document_content" + ) + result = await self._call_dingtalk_mcp_tool( + mcp_config, + tool_name="get_document_content", + arguments={"nodeId": doc_id}, + ) + content = result.get("content", "") + + if not content: + raise ValueError("Empty content returned from DingTalk MCP") + + return { + "content": content, + "title": title, + "modified_time": modified_time, + "file_extension": ("md" if extension == "adoc" else extension or "md"), + "doc_id": doc_id, + "content_type": content_type, + "extension": extension, + } + + except httpx.HTTPStatusError as e: + logger.error( + f"HTTP error from DingTalk MCP: {e.response.status_code} - {e.response.text}" + ) + if e.response.status_code in AUTH_ERROR_CODES: + raise ValueError( + "DingTalk authentication required. Please ensure:\n" + "1. You have access to this document in DingTalk\n" + "2. The document is shared with you or is public\n" + "3. Your DingTalk MCP configuration is correct" + ) + raise ValueError( + f"Failed to download document from DingTalk: HTTP {e.response.status_code}" + ) + except Exception as e: + logger.error(f"Failed to download document from MCP: {e}") + error_msg = str(e) + if "authentication" in error_msg.lower() or "auth" in error_msg.lower(): + raise ValueError( + f"DingTalk authentication required: {error_msg}\n" + f"Please ensure you have access to this document in DingTalk." + ) + raise ValueError(f"Failed to download document: {e}") + + +# Singleton instance +dingtalk_docs_service = DingTalkDocsService() diff --git a/backend/app/services/knowledge/orchestrator.py b/backend/app/services/knowledge/orchestrator.py index ca52c2d86..b6635517b 100644 --- a/backend/app/services/knowledge/orchestrator.py +++ b/backend/app/services/knowledge/orchestrator.py @@ -1105,6 +1105,7 @@ def create_document_with_content( trigger_indexing: bool = True, trigger_summary: bool = True, splitter_config: Optional[Dict[str, Any]] = None, + source_config: Optional[Dict[str, Any]] = None, ) -> KnowledgeDocumentResponse: """ Create a document with complete workflow. @@ -1125,6 +1126,7 @@ def create_document_with_content( trigger_indexing: Whether to trigger RAG indexing trigger_summary: Whether to trigger summary generation splitter_config: Optional splitter configuration dict + source_config: Optional source configuration dict (e.g., {'url': '...', 'source': '...'}) Returns: KnowledgeDocumentResponse @@ -1228,6 +1230,7 @@ def create_document_with_content( attachment_id=attachment.id, file_extension=normalized_ext, file_size=len(binary_data), + source_config=source_config or {}, ) return self._create_and_index_document( @@ -1261,6 +1264,7 @@ def create_document_with_content( attachment_id=attachment.id, file_extension=normalized_ext, file_size=len(binary_data), + source_config=source_config or {}, ) return self._create_and_index_document( diff --git a/backend/init_data/skills/dingtalk-connector/SKILL.md b/backend/init_data/skills/dingtalk-connector/SKILL.md new file mode 100644 index 000000000..3ab8176c1 --- /dev/null +++ b/backend/init_data/skills/dingtalk-connector/SKILL.md @@ -0,0 +1,53 @@ +# SPDX-FileCopyrightText: 2025 WeCode, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +name: dingtalk-connector +description: | + Skill for adding DingTalk documents to Wegent knowledge bases. + + This skill provides tools to: + 1. Download DingTalk documents via MCP + 2. Save documents with name: {title}.{file_extension} + 3. Upload as attachments to Wegent + 4. Create knowledge base documents + + Usage: + - Use `dingtalk_doc_to_kb` tool to add a DingTalk document to a knowledge base + - The tool handles the entire workflow: download -> save -> upload -> create document + +version: 1.0.0 +author: Wegent Team + +mcp_servers: + wegent-knowledge: + type: streamable-http + url: "${backend_url}/mcp/knowledge/sse" + headers: + Authorization: "Bearer ${auth_token}" + timeout: 300 + +tools: + - name: dingtalk_doc_to_kb + provider: dingtalk-connector + description: | + Add a DingTalk document to Wegent knowledge base. + + This tool performs the complete workflow: + 1. Starts a sandbox environment + 2. Downloads the DingTalk document content via MCP + 3. Saves the document as {title}.{file_extension} + 4. Uploads the file as an attachment + 5. Creates a knowledge base document + + Parameters: + - dingtalk_doc_url: The DingTalk document URL + - knowledge_base_id: Target knowledge base ID + - doc_title: Document title (optional, fetched from DingTalk if not provided) + + Returns: + - success: Whether the operation succeeded + - document_id: Created document ID + - document_name: Document name + - attachment_id: Attachment ID + - message: Status message diff --git a/backend/init_data/skills/dingtalk-connector/__init__.py b/backend/init_data/skills/dingtalk-connector/__init__.py new file mode 100644 index 000000000..6af448a91 --- /dev/null +++ b/backend/init_data/skills/dingtalk-connector/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2025 WeCode, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +"""DingTalk Docs Skill Package. + +This skill provides tools for adding DingTalk documents to Wegent knowledge bases. +""" + +from chat_shell.skills import SkillToolProvider + +from .provider import DingTalkDocsToolProvider + +__all__ = ["DingTalkDocsToolProvider"] diff --git a/backend/init_data/skills/dingtalk-connector/provider.py b/backend/init_data/skills/dingtalk-connector/provider.py new file mode 100644 index 000000000..7e923e73c --- /dev/null +++ b/backend/init_data/skills/dingtalk-connector/provider.py @@ -0,0 +1,946 @@ +# SPDX-FileCopyrightText: 2025 WeCode, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +"""DingTalk Docs Tool Provider. + +This module provides the DingTalkDocsToolProvider class that creates +tools for adding DingTalk documents to Wegent knowledge bases. + +The tool executes in a sandbox environment to: +1. Download DingTalk document content via MCP +2. Save with name: {title}.{file_extension} +3. Upload as attachment +4. Create knowledge base document +""" + +import json +import logging +import os +from datetime import datetime +from typing import Any, Optional + +from langchain_core.callbacks import CallbackManagerForToolRun +from pydantic import BaseModel, Field + +from shared.telemetry.decorators import trace_async + +logger = logging.getLogger(__name__) + +# Default API base URL for attachment uploads +DEFAULT_API_BASE_URL = "http://backend:8000" + +# Maximum file size for uploads (100 MB) +MAX_UPLOAD_SIZE = 100 * 1024 * 1024 + + +class DingTalkDocToKBInput(BaseModel): + """Input schema for dingtalk_doc_to_kb tool.""" + + dingtalk_doc_url: str = Field( + ..., + description="DingTalk document URL (e.g., https://alidocs.dingtalk.com/i/nodes/xxx)", + ) + knowledge_base_id: int = Field( + ..., + description="Target knowledge base ID", + ) + doc_title: Optional[str] = Field( + default=None, + description="Document title (optional, will be fetched from DingTalk if not provided)", + ) + trigger_indexing: bool = Field( + default=True, + description="Whether to trigger RAG indexing (default: True)", + ) + trigger_summary: bool = Field( + default=True, + description="Whether to trigger summary generation (default: True)", + ) + + +# Import base class here - use try/except to handle both direct and dynamic loading +try: + # Try relative import (for direct usage) + from chat_shell.tools.sandbox._base import BaseSandboxTool +except ImportError: + # Try absolute import (for dynamic loading) + import sys + + # Get the package name dynamically + package_name = __name__.rsplit(".", 1)[0] + _base_module = sys.modules.get(f"{package_name}._base") + if _base_module: + BaseSandboxTool = _base_module.BaseSandboxTool + else: + raise ImportError( + "Cannot import BaseSandboxTool from chat_shell.tools.sandbox._base" + ) + + +class DingTalkDocsToolProvider: + """Tool provider for DingTalk Docs operations. + + This provider creates tools that allow Chat Shell agents to add + DingTalk documents to Wegent knowledge bases. + """ + + @property + def provider_name(self) -> str: + """Return the provider name used in SKILL.md.""" + return "dingtalk-connector" + + @property + def supported_tools(self) -> list[str]: + """Return the list of tools this provider can create.""" + return ["dingtalk_doc_to_kb"] + + def create_tool( + self, + tool_name: str, + context: Any, + tool_config: Optional[dict[str, Any]] = None, + ) -> Any: + """Create a DingTalk Docs tool instance. + + Args: + tool_name: Name of the tool to create + context: Context with dependencies + tool_config: Optional configuration + + Returns: + Configured tool instance + + Raises: + ValueError: If tool_name is unknown + """ + logger.info( + f"[DingTalkDocsProvider] Creating tool: {tool_name}, " + f"task_id={context.task_id}, user_id={context.user_id}" + ) + + if tool_name == "dingtalk_doc_to_kb": + return self._create_dingtalk_doc_to_kb_tool(context, tool_config) + else: + raise ValueError(f"Unknown tool: {tool_name}") + + def _create_dingtalk_doc_to_kb_tool( + self, context: Any, tool_config: Optional[dict[str, Any]] + ) -> "DingTalkDocToKBTool": + """Create the dingtalk_doc_to_kb tool.""" + config = tool_config or {} + + return DingTalkDocToKBTool( + task_id=context.task_id, + subtask_id=context.subtask_id, + ws_emitter=context.ws_emitter, + user_id=context.user_id, + user_name=context.user_name, + bot_config=config.get("bot_config", []), + default_shell_type=config.get("default_shell_type", "ClaudeCode"), + timeout=config.get("timeout", 7200), + auth_token=context.auth_token, + skill_identity_token=context.skill_identity_token, + api_base_url=config.get("api_base_url", ""), + ) + + def validate_config(self, tool_config: dict[str, Any]) -> bool: + """Validate DingTalk Docs tool configuration.""" + if not tool_config: + return True + + # Validate shell_type if present + shell_type = tool_config.get("default_shell_type") + if shell_type is not None: + if shell_type not in ["ClaudeCode", "Agno"]: + return False + + return True + + +class DingTalkDocToKBTool(BaseSandboxTool): + """Tool for adding DingTalk documents to Wegent knowledge base. + + This tool performs the complete workflow: + 1. Starts a sandbox environment + 2. Calls MCP to get document info and content + 3. Saves the document as {title}.{file_extension} + 4. Uploads the file as an attachment + 5. Creates a knowledge base document + """ + + name: str = "dingtalk_doc_to_kb" + display_name: str = "添加钉钉文档到知识库" + description: str = """Add a DingTalk document to Wegent knowledge base. + +This tool downloads a DingTalk document and adds it to a knowledge base. + +Workflow: +1. Fetches document info from DingTalk (title, modification time) +2. Downloads document content +3. Saves as {title}.{file_extension} +4. Uploads as attachment to Wegent +5. Creates knowledge base document + +Parameters: +- dingtalk_doc_url (required): DingTalk document URL +- knowledge_base_id (required): Target knowledge base ID +- doc_title (optional): Document title (fetched from DingTalk if not provided) +- trigger_indexing (optional): Whether to trigger RAG indexing (default: True) +- trigger_summary (optional): Whether to trigger summary generation (default: True) + +Returns: +- success: Whether the operation succeeded +- document_id: Created document ID +- document_name: Document name +- attachment_id: Attachment ID +- message: Status message + +Example: +{ + "dingtalk_doc_url": "https://alidocs.dingtalk.com/i/nodes/xxx", + "knowledge_base_id": 123, + "doc_title": "产品需求文档" +} +""" + + args_schema: type[BaseModel] = DingTalkDocToKBInput + + # Configuration + max_upload_size: int = MAX_UPLOAD_SIZE + api_base_url: str = "" + + def _run( + self, + dingtalk_doc_url: str, + knowledge_base_id: int, + doc_title: Optional[str] = None, + trigger_indexing: bool = True, + trigger_summary: bool = True, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> str: + """Synchronous run - not implemented.""" + raise NotImplementedError("DingTalkDocToKBTool only supports async execution") + + @trace_async( + span_name="dingtalk.import", + tracer_name="dingtalk-connector", + extract_attributes=lambda self, dingtalk_doc_url, knowledge_base_id, **kwargs: { + "dingtalk_doc_url": dingtalk_doc_url, + "knowledge_base_id": knowledge_base_id, + }, + ) + async def _arun( + self, + dingtalk_doc_url: str, + knowledge_base_id: int, + doc_title: Optional[str] = None, + trigger_indexing: bool = True, + trigger_summary: bool = True, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> str: + """Execute the DingTalk document to knowledge base workflow. + + Args: + dingtalk_doc_url: DingTalk document URL + knowledge_base_id: Target knowledge base ID + doc_title: Optional document title + trigger_indexing: Whether to trigger RAG indexing + trigger_summary: Whether to trigger summary generation + run_manager: Callback manager + + Returns: + JSON string with operation result + """ + logger.info( + f"[DingTalkDocToKBTool] Starting workflow: url={dingtalk_doc_url}, " + f"kb_id={knowledge_base_id}" + ) + + # Emit status update + await self._emit_tool_status( + "running", + f"Fetching DingTalk document info: {dingtalk_doc_url}", + ) + + try: + # Step 1: Get sandbox manager and create sandbox + sandbox_manager = self._get_sandbox_manager() + sandbox, error = await sandbox_manager.get_or_create_sandbox( + shell_type=self.default_shell_type, + workspace_ref=None, + task_type="dingtalk-connector", + ) + + if error: + logger.error(f"[DingTalkDocToKBTool] Failed to create sandbox: {error}") + result = self._format_error( + error_message=f"Failed to create sandbox: {error}", + ) + await self._emit_tool_status("failed", error) + return result + + logger.info(f"[DingTalkDocToKBTool] Sandbox created: {sandbox.sandbox_id}") + + # Step 2: Call MCP to get document info + await self._emit_tool_status( + "running", + "Fetching document information from DingTalk...", + ) + + doc_info = await self._get_document_info_from_mcp(sandbox, dingtalk_doc_url) + + if not doc_info.get("success"): + error_msg = doc_info.get("error", "Failed to get document info") + logger.error(f"[DingTalkDocToKBTool] {error_msg}") + result = self._format_error(error_message=error_msg) + await self._emit_tool_status("failed", error_msg) + return result + + # Use provided title or fetched title + title = doc_title or doc_info.get("title", "DingTalk Document") + + # Step 3: Download document content + await self._emit_tool_status( + "running", + f"Downloading document content: {title}...", + ) + + doc_content_result = await self._download_document_content_real( + sandbox, dingtalk_doc_url + ) + + if not doc_content_result.get("success"): + error_msg = doc_content_result.get( + "error", "Failed to download document" + ) + logger.error(f"[DingTalkDocToKBTool] {error_msg}") + result = self._format_error(error_message=error_msg) + await self._emit_tool_status("failed", error_msg) + return result + + content = doc_content_result.get("content", "") + file_extension = doc_content_result.get("file_extension", "md") + + # Step 4: Save document to sandbox + # Build filename + from app.services.knowledge.orchestrator import _build_filename + + filename = _build_filename(title, file_extension) + file_path = f"/home/user/{filename}" + + await self._emit_tool_status( + "running", + f"Saving document as {filename}...", + ) + + save_result = await self._save_document_to_sandbox( + sandbox, file_path, content + ) + + if not save_result.get("success"): + error_msg = save_result.get("error", "Failed to save document") + logger.error(f"[DingTalkDocToKBTool] {error_msg}") + result = self._format_error(error_message=error_msg) + await self._emit_tool_status("failed", error_msg) + return result + + # Step 5: Upload as attachment + await self._emit_tool_status( + "running", + "Uploading document as attachment...", + ) + + upload_result = await self._upload_attachment(sandbox, file_path) + + if not upload_result.get("success"): + error_msg = upload_result.get("error", "Failed to upload attachment") + logger.error(f"[DingTalkDocToKBTool] {error_msg}") + result = self._format_error(error_message=error_msg) + await self._emit_tool_status("failed", error_msg) + return result + + attachment_id = upload_result.get("attachment_id") + + # Step 6: Create knowledge base document + await self._emit_tool_status( + "running", + "Creating knowledge base document...", + ) + + create_result = await self._create_kb_document( + sandbox, + knowledge_base_id=knowledge_base_id, + doc_title=title, + attachment_id=attachment_id, + trigger_indexing=trigger_indexing, + trigger_summary=trigger_summary, + ) + + if not create_result.get("success"): + error_msg = create_result.get("error", "Failed to create document") + logger.error(f"[DingTalkDocToKBTool] {error_msg}") + result = self._format_error(error_message=error_msg) + await self._emit_tool_status("failed", error_msg) + return result + + document_id = create_result.get("document_id") + + # Success! + result = { + "success": True, + "document_id": document_id, + "document_name": title, + "attachment_id": attachment_id, + "filename": filename, + "message": f"Document '{title}' added to knowledge base successfully", + } + + logger.info( + f"[DingTalkDocToKBTool] Workflow completed: document_id={document_id}" + ) + + await self._emit_tool_status( + "completed", + f"Document '{title}' added to knowledge base successfully", + result, + ) + + return json.dumps(result, ensure_ascii=False, indent=2) + + except Exception as e: + logger.error(f"[DingTalkDocToKBTool] Workflow failed: {e}", exc_info=True) + error_msg = f"Failed to add DingTalk document: {e}" + result = self._format_error(error_message=error_msg) + await self._emit_tool_status("failed", error_msg) + return result + + async def _get_document_info_from_mcp(self, sandbox: Any, doc_url: str) -> dict: + """Get document info from DingTalk via MCP. + + Calls the dingtalk_docs MCP to get real document metadata. + """ + try: + # Extract doc ID from URL + import re + + doc_id = None + patterns = [ + r"alidocs\.dingtalk\.com/i/nodes/([a-zA-Z0-9_-]+)", + r"alidocs\.dingtalk\.com/i/team/[^/]+/docs/([a-zA-Z0-9_-]+)", + r"alidocs\.dingtalk\.com/i/team/[^/]+/wiki/([a-zA-Z0-9_-]+)", + ] + + for pattern in patterns: + match = re.search(pattern, doc_url) + if match: + doc_id = match.group(1) + break + + if not doc_id: + return { + "success": False, + "error": f"Could not extract document ID from URL: {doc_url}", + } + + # Call dingtalk_docs MCP via backend API + api_base_url = self.api_base_url or os.getenv( + "BACKEND_API_URL", DEFAULT_API_BASE_URL + ) + api_base_url = api_base_url.rstrip("/") + + auth_token = self.auth_token + if not auth_token: + return { + "success": False, + "error": "No authentication token available", + } + + mcp_url = f"{api_base_url}/mcp/knowledge/sse" + + # Build MCP tool call payload + payload = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "get_dingtalk_document_info", + "arguments": {"doc_url": doc_url}, + }, + "id": 1, + } + + # Write payload to a temporary file with unique name + import uuid + + payload_file = f"/tmp/mcp_payload_{uuid.uuid4().hex}.json" + payload_json = json.dumps(payload) + await sandbox.files.write(payload_file, payload_json) + + try: + # Build curl command using array style + import shlex + + curl_cmd = [ + "curl", + "-s", + "-X", + "POST", + "-H", + f"Authorization: Bearer {auth_token}", + "-H", + "Content-Type: application/json", + "-d", + f"@{payload_file}", + mcp_url, + ] + + result_obj = await sandbox.commands.run( + cmd=shlex.join(curl_cmd), + cwd="/home/user", + timeout=60, + ) + + if result_obj.exit_code != 0: + return { + "success": False, + "error": f"MCP call failed: {result_obj.stderr or 'Unknown error'}", + } + + # Parse response + response = json.loads(result_obj.stdout) + + if "error" in response: + return { + "success": False, + "error": response["error"].get("message", "Unknown error"), + } + + result_content = response.get("result", {}).get("content", []) + if result_content: + tool_result = json.loads(result_content[0].get("text", "{}")) + if tool_result.get("success"): + return { + "success": True, + "doc_id": tool_result.get("doc_id", doc_id), + "title": tool_result.get( + "title", f"DingTalkDoc_{doc_id[:8]}" + ), + "modified_time": tool_result.get( + "modified_time", datetime.now().isoformat() + ), + } + else: + return { + "success": False, + "error": tool_result.get("error", "Unknown error"), + } + + return {"success": False, "error": "Empty response from MCP"} + + finally: + # Clean up temp file + try: + await sandbox.commands.run( + cmd=f"rm -f {payload_file}", + cwd="/home/user", + timeout=10, + ) + except Exception: + pass + + except Exception as e: + return {"success": False, "error": str(e)} + + async def _download_document_content(self, sandbox: Any, doc_url: str) -> dict: + """Download document content from DingTalk. + + For now, this returns placeholder content. + In production, this would call the DingTalk API via MCP. + """ + try: + # Extract doc ID for content generation + import re + + doc_id = None + patterns = [ + r"alidocs\.dingtalk\.com/i/nodes/([a-zA-Z0-9_-]+)", + r"alidocs\.dingtalk\.com/i/team/[^/]+/docs/([a-zA-Z0-9_-]+)", + ] + + for pattern in patterns: + match = re.search(pattern, doc_url) + if match: + doc_id = match.group(1) + break + + # Generate placeholder content + content = f"""# DingTalk Document + +This document was imported from DingTalk. + +**Source URL:** {doc_url} +**Document ID:** {doc_id or "unknown"} +**Imported at:** {datetime.now().isoformat()} + +## Content + +The actual content would be fetched from DingTalk API in production. +This is a placeholder for the document content. + +--- +*Imported by Wegent DingTalk Docs Skill* +""" + + return {"success": True, "content": content} + + except Exception as e: + return {"success": False, "error": str(e)} + + async def _download_document_content_real(self, sandbox: Any, doc_url: str) -> dict: + """Download document content from DingTalk via MCP. + + Calls the dingtalk_docs MCP to get real document content. + """ + try: + # Extract doc ID from URL + import re + + doc_id = None + patterns = [ + r"alidocs\.dingtalk\.com/i/nodes/([a-zA-Z0-9_-]+)", + r"alidocs\.dingtalk\.com/i/team/[^/]+/docs/([a-zA-Z0-9_-]+)", + r"alidocs\.dingtalk\.com/i/team/[^/]+/wiki/([a-zA-Z0-9_-]+)", + ] + + for pattern in patterns: + match = re.search(pattern, doc_url) + if match: + doc_id = match.group(1) + break + + if not doc_id: + return { + "success": False, + "error": f"Could not extract document ID from URL: {doc_url}", + } + + # Call dingtalk_docs MCP via backend API + api_base_url = self.api_base_url or os.getenv( + "BACKEND_API_URL", DEFAULT_API_BASE_URL + ) + api_base_url = api_base_url.rstrip("/") + + auth_token = self.auth_token + if not auth_token: + return { + "success": False, + "error": "No authentication token available", + } + + mcp_url = f"{api_base_url}/mcp/knowledge/sse" + + # Build MCP tool call payload for download_dingtalk_document + payload = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "download_dingtalk_document", + "arguments": { + "doc_url": doc_url, + }, + }, + "id": 1, + } + + # Write payload to a temporary file with unique name + import uuid + + payload_file = f"/tmp/mcp_download_payload_{uuid.uuid4().hex}.json" + payload_json = json.dumps(payload) + await sandbox.files.write(payload_file, payload_json) + + try: + # Build curl command using array style + import shlex + + curl_cmd = [ + "curl", + "-s", + "-X", + "POST", + "-H", + f"Authorization: Bearer {auth_token}", + "-H", + "Content-Type: application/json", + "-d", + f"@{payload_file}", + mcp_url, + ] + + result_obj = await sandbox.commands.run( + cmd=shlex.join(curl_cmd), + cwd="/home/user", + timeout=120, # Longer timeout for download + ) + + if result_obj.exit_code != 0: + return { + "success": False, + "error": f"MCP download failed: {result_obj.stderr or 'Unknown error'}", + } + + # Parse response + response = json.loads(result_obj.stdout) + + if "error" in response: + return { + "success": False, + "error": response["error"].get("message", "Unknown error"), + } + + result_content = response.get("result", {}).get("content", []) + if result_content: + tool_result = json.loads(result_content[0].get("text", "{}")) + if tool_result.get("success"): + content = tool_result.get("content", "") + if not content: + return { + "success": False, + "error": "Empty content returned from DingTalk", + } + return {"success": True, "content": content} + else: + return { + "success": False, + "error": tool_result.get("error", "Unknown error"), + } + + return {"success": False, "error": "Empty response from MCP"} + + finally: + # Clean up temp file + try: + await sandbox.commands.run( + cmd=f"rm -f {payload_file}", + cwd="/home/user", + timeout=10, + ) + except Exception: + pass + + except Exception as e: + return {"success": False, "error": str(e)} + + async def _save_document_to_sandbox( + self, sandbox: Any, file_path: str, content: str + ) -> dict: + """Save document content to sandbox file.""" + try: + # Write content to file in sandbox + await sandbox.files.write(file_path, content) + return {"success": True, "file_path": file_path} + except Exception as e: + return {"success": False, "error": f"Failed to save file: {e}"} + + async def _upload_attachment(self, sandbox: Any, file_path: str) -> dict: + """Upload file from sandbox as attachment.""" + try: + # Get API base URL and auth token + api_base_url = self.api_base_url or os.getenv( + "BACKEND_API_URL", DEFAULT_API_BASE_URL + ) + api_base_url = api_base_url.rstrip("/") + + auth_token = self.auth_token + if not auth_token: + return { + "success": False, + "error": "No authentication token available for upload", + } + + upload_url = f"{api_base_url}/api/attachments/upload" + + # Build curl command with shlex.quote to prevent shell injection + import shlex + + curl_cmd = [ + "curl", + "-s", + "-X", + "POST", + "-H", + f"Authorization: Bearer {auth_token}", + "-F", + f"file=@{file_path}", + upload_url, + ] + + # Execute curl command (pass as list to avoid shell interpretation) + result_obj = await sandbox.commands.run( + cmd=shlex.join(curl_cmd), + cwd="/home/user", + timeout=300, + ) + + if result_obj.exit_code != 0: + return { + "success": False, + "error": f"Upload failed: {result_obj.stderr or 'Unknown error'}", + } + + # Parse JSON response + api_response = json.loads(result_obj.stdout) + + if "detail" in api_response: + error_detail = api_response["detail"] + error_msg = ( + error_detail.get("message", str(error_detail)) + if isinstance(error_detail, dict) + else str(error_detail) + ) + return {"success": False, "error": f"Upload API error: {error_msg}"} + + attachment_id = api_response.get("id") + if not attachment_id: + return {"success": False, "error": "No attachment ID in response"} + + return {"success": True, "attachment_id": attachment_id} + + except json.JSONDecodeError as e: + return {"success": False, "error": f"Failed to parse upload response: {e}"} + except Exception as e: + return {"success": False, "error": f"Upload failed: {e}"} + + async def _create_kb_document( + self, + sandbox: Any, + knowledge_base_id: int, + doc_title: str, + attachment_id: int, + trigger_indexing: bool, + trigger_summary: bool, + ) -> dict: + """Create knowledge base document using MCP.""" + try: + # Get API base URL and auth token + api_base_url = self.api_base_url or os.getenv( + "BACKEND_API_URL", DEFAULT_API_BASE_URL + ) + api_base_url = api_base_url.rstrip("/") + + auth_token = self.auth_token + if not auth_token: + return { + "success": False, + "error": "No authentication token available", + } + + # Call the add_dingtalk_doc_with_attachment MCP tool via API + # Since we can't directly call MCP from sandbox, we use the knowledge API + mcp_url = f"{api_base_url}/mcp/knowledge/sse" + + # Build the tool call payload + payload = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "add_dingtalk_doc_with_attachment", + "arguments": { + "knowledge_base_id": knowledge_base_id, + "doc_title": doc_title, + "attachment_id": attachment_id, + "trigger_indexing": trigger_indexing, + "trigger_summary": trigger_summary, + }, + }, + "id": 1, + } + + # Write payload to a temporary file with unique name to avoid race conditions + import uuid + + payload_file = f"/tmp/mcp_payload_{uuid.uuid4().hex}.json" + payload_json = json.dumps(payload) + await sandbox.files.write(payload_file, payload_json) + + try: + # Build curl command using array style to prevent shell injection + import shlex + + curl_cmd = [ + "curl", + "-s", + "-X", + "POST", + "-H", + f"Authorization: Bearer {auth_token}", + "-H", + "Content-Type: application/json", + "-d", + f"@{payload_file}", + mcp_url, + ] + + result_obj = await sandbox.commands.run( + cmd=shlex.join(curl_cmd), + cwd="/home/user", + timeout=60, + ) + + # Clean up temp file after use + try: + await sandbox.commands.run( + cmd=f"rm -f {payload_file}", + cwd="/home/user", + timeout=10, + ) + except Exception: + pass + + except Exception as e: + # Clean up temp file on error + try: + await sandbox.commands.run( + cmd=f"rm -f {payload_file}", + cwd="/home/user", + timeout=10, + ) + except Exception: + pass + raise e + + if result_obj.exit_code != 0: + return { + "success": False, + "error": f"MCP call failed: {result_obj.stderr or 'Unknown error'}", + } + + # Parse response + response = json.loads(result_obj.stdout) + + if "error" in response: + return { + "success": False, + "error": response["error"].get("message", "Unknown error"), + } + + result_content = response.get("result", {}).get("content", []) + if result_content: + tool_result = json.loads(result_content[0].get("text", "{}")) + if tool_result.get("success"): + return { + "success": True, + "document_id": tool_result.get("document_id"), + } + else: + return { + "success": False, + "error": tool_result.get("error", "Unknown error"), + } + + return {"success": False, "error": "Empty response from MCP"} + + except json.JSONDecodeError as e: + return {"success": False, "error": f"Failed to parse MCP response: {e}"} + except Exception as e: + return {"success": False, "error": f"Failed to create document: {e}"}