Skip to content

Conversation

@jer96
Copy link
Contributor

@jer96 jer96 commented Aug 1, 2025

Description

Support A2A FileParts and DataParts in the Strands A2A Executor.

FilePart Support

FileParts allow agents to receive file attachments in their messages. These can contain:

  • Binary data (via bytes field) - images, documents, videos, etc.
  • File references (via URI field) - references to files stored elsewhere

The implementation automatically classifies files by MIME type (image, video, document) and converts them to appropriate Strands ContentBlocks using Python's built-in mimetypes library.

DataPart Support

DataParts enable agents to receive structured data alongside text and files. These contain JSON-serializable objects that provide:

  • Structured context - metadata, configuration, user profiles
  • API payloads - structured data for processing
  • Rich content - complex data that doesn't fit in plain text

DataParts are converted to formatted JSON text blocks so agents can understand and process the structured information.

Use Cases

  • Multi-modal agents that need to process text, images, and documents together
  • Data processing agents that receive structured input alongside instructions
  • File analysis agents that need both the file content and metadata
  • Rich messaging where clients can send complex, structured requests

This allows A2A clients to send richer, more complex messages to agents while maintaining compatibility with the existing text-based interface.

Related Issues

Documentation PR

Type of Change

New feature

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Custom test script

"""
A2A Test Client with Support for Multiple Message Part Types

This client demonstrates how to send messages with different part types to an A2A agent:
- TextPart: Plain text content
- FilePart: File content (from bytes or URI)
- DataPart: Structured data (JSON objects)

The client supports:
1. Simple text messages
2. File attachments (from local files or URIs)
3. Structured data payloads
4. Mixed messages with multiple part types
5. Utility functions for common scenarios

Usage Examples:
    # Simple text message
    asyncio.run(demo_text_message())

    # File with structured data
    asyncio.run(demo_mixed_message())

    # All demos
    asyncio.run(main())
"""

import asyncio
import logging
import base64
from pathlib import Path
from typing import Any, Union, Dict, List
from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import MessageSendParams, SendMessageRequest

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def create_text_part(text: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]:
    """Create a text part for a message."""
    part = {
        "kind": "text",
        "text": text,
    }
    if metadata:
        part["metadata"] = metadata
    return part


def create_file_part_from_bytes(
    file_bytes: bytes,
    mime_type: str = None,
    name: str = None,
    metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
    """Create a file part from bytes."""
    file_data = {
        "bytes": base64.b64encode(file_bytes).decode("utf-8")
    }
    if mime_type:
        file_data["mime_type"] = mime_type
    if name:
        file_data["name"] = name

    part = {
        "kind": "file",
        "file": file_data,
    }
    if metadata:
        part["metadata"] = metadata
    return part


def create_file_part_from_uri(
    uri: str,
    mime_type: str = None,
    name: str = None,
    metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
    """Create a file part from a URI."""
    file_data = {
        "uri": uri
    }
    if mime_type:
        file_data["mime_type"] = mime_type
    if name:
        file_data["name"] = name

    part = {
        "kind": "file",
        "file": file_data,
    }
    if metadata:
        part["metadata"] = metadata
    return part


def create_data_part(data: Dict[str, Any], metadata: Dict[str, Any] = None) -> Dict[str, Any]:
    """Create a data part for structured data."""
    part = {
        "kind": "data",
        "data": data,
    }
    if metadata:
        part["metadata"] = metadata
    return part


def create_file_part_from_path(
    file_path: Union[str, Path],
    mime_type: str = None,
    name: str = None,
    metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
    """Create a file part by loading from a local file path."""
    path = Path(file_path)

    if not path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")

    with open(path, 'rb') as f:
        file_bytes = f.read()

    # Auto-detect name if not provided
    if name is None:
        name = path.name

    # Simple mime type detection based on file extension
    if mime_type is None:
        extension = path.suffix.lower()
        mime_types = {
            '.txt': 'text/plain',
            '.json': 'application/json',
            '.csv': 'text/csv',
            '.xml': 'application/xml',
            '.html': 'text/html',
            '.pdf': 'application/pdf',
            '.jpg': 'image/jpeg',
            '.jpeg': 'image/jpeg',
            '.png': 'image/png',
            '.gif': 'image/gif',
            '.svg': 'image/svg+xml',
            '.mp3': 'audio/mpeg',
            '.wav': 'audio/wav',
            '.mp4': 'video/mp4',
            '.zip': 'application/zip',
            '.doc': 'application/msword',
            '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
        }
        mime_type = mime_types.get(extension, 'application/octet-stream')

    return create_file_part_from_bytes(file_bytes, mime_type, name, metadata)


def create_json_data_part(json_data: Any, metadata: Dict[str, Any] = None) -> Dict[str, Any]:
    """Create a data part from JSON-serializable data."""
    if isinstance(json_data, (str, bytes)):
        import json
        if isinstance(json_data, bytes):
            json_data = json_data.decode('utf-8')
        parsed_data = json.loads(json_data)
    else:
        parsed_data = json_data

    return create_data_part(parsed_data, metadata)


def create_multipart_message(
    text: str = None,
    file_paths: List[Union[str, Path]] = None,
    file_uris: List[str] = None,
    data_objects: List[Dict[str, Any]] = None,
    role: str = "user"
) -> Dict[str, Any]:
    """Create a message with multiple parts from various sources.

    Args:
        text: Optional text content
        file_paths: List of local file paths to include
        file_uris: List of URIs to include as file parts
        data_objects: List of structured data objects to include
        role: Message role ('user' or 'agent')
    """
    parts = []

    if text:
        parts.append(create_text_part(text))

    if file_paths:
        for file_path in file_paths:
            parts.append(create_file_part_from_path(file_path))

    if file_uris:
        for uri in file_uris:
            parts.append(create_file_part_from_uri(uri))

    if data_objects:
        for data_obj in data_objects:
            parts.append(create_data_part(data_obj))

    if not parts:
        raise ValueError("At least one content type must be provided")

    return create_message_payload(role=role, parts=parts)


def create_message_payload(
    *,
    role: str = "user",
    parts: List[Dict[str, Any]] = None,
    text: str = None,
    message_id: str = None
) -> Dict[str, Any]:
    """Create a message payload with support for multiple part types.

    Args:
        role: The role of the message sender ('user' or 'agent')
        parts: List of message parts (text, file, or data parts)
        text: Simple text content (creates a text part if parts is None)
        message_id: Optional message ID (generates one if not provided)
    """
    if parts is None:
        if text is None:
            raise ValueError("Either 'parts' or 'text' must be provided")
        parts = [create_text_part(text)]

    return {
        "message": {
            "role": role,
            "parts": parts,
            "messageId": message_id or uuid4().hex,
        },
    }

async def send_sync_message(message: str, base_url: str = "http://localhost:9000"):
    async with httpx.AsyncClient() as httpx_client:
        # Get agent card
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
        agent_card = await resolver.get_agent_card()

        # Create client
        client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)

        # Send message
        payload = create_message_payload(text=message)
        request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(**payload))

        response = await client.send_message(request)
        logger.info(response.model_dump_json(exclude_none=True, indent=2))
        return response


async def send_sync_message(message: str, base_url: str = "http://localhost:9000"):
    async with httpx.AsyncClient() as httpx_client:
        # Get agent card
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
        agent_card = await resolver.get_agent_card()

        # Create client
        client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)

        # Send message
        payload = create_message_payload(text=message)
        request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(**payload))

        response = await client.send_message(request)
        logger.info(response.model_dump_json(exclude_none=True, indent=2))
        return response


async def send_message_with_parts(parts: List[Dict[str, Any]], base_url: str = "http://localhost:9000"):
    """Send a message with custom parts to the agent.

    Args:
        parts: List of message parts (text, file, or data)
        base_url: Base URL of the A2A agent

    Returns:
        The response from the agent

    Raises:
        httpx.HTTPError: If there's a network error
        Exception: If the agent returns an error response
    """
    try:
        async with httpx.AsyncClient(timeout=30.0) as httpx_client:
            # Get agent card
            resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
            agent_card = await resolver.get_agent_card()

            # Create client
            client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)

            # Send message
            payload = create_message_payload(parts=parts)
            request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(**payload))

            response = await client.send_message(request)
            logger.info("Response received:")
            logger.info(response.model_dump_json(exclude_none=True, indent=2))
            return response

    except httpx.TimeoutException:
        logger.error("Request timed out")
        raise
    except httpx.HTTPError as e:
        logger.error(f"HTTP error occurred: {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise


async def demo_text_message():
    """Demo: Send a simple text message."""
    logger.info("=== Demo 1: Text Message ===")
    try:
        text_part = create_text_part("What is 101 * 11?")
        await send_message_with_parts([text_part])
        logger.info("✓ Text message demo completed successfully")
    except Exception as e:
        logger.error(f"✗ Text message demo failed: {e}")


async def demo_file_message():
    """Demo: Send a message with a file (bytes)."""
    logger.info("=== Demo 2: File Message (from bytes) ===")
    try:
        # Create a simple text file content
        file_content = "This is a sample document.\nIt contains multiple lines.\nHere's some data: 123, 456, 789"
        file_bytes = file_content.encode('utf-8')

        # Create parts
        text_part = create_text_part("Please analyze this document:")
        file_part = create_file_part_from_bytes(
            file_bytes,
            mime_type="text/plain",
            name="sample.txt"
        )

        await send_message_with_parts([text_part, file_part])
        logger.info("✓ File message demo completed successfully")
    except Exception as e:
        logger.error(f"✗ File message demo failed: {e}")


async def demo_data_message():
    """Demo: Send a message with structured data."""
    logger.info("=== Demo 3: Structured Data Message ===")
    try:
        # Create structured data
        data = {
            "user_info": {
                "name": "John Doe",
                "age": 30,
                "preferences": ["cooking", "reading", "hiking"]
            },
            "request_type": "recipe_recommendation",
            "dietary_restrictions": ["vegetarian"],
            "cooking_time": "30 minutes"
        }

        # Create parts
        text_part = create_text_part("Based on this user profile, what recipe would you recommend?")
        data_part = create_data_part(data)

        await send_message_with_parts([text_part, data_part])
        logger.info("✓ Data message demo completed successfully")
    except Exception as e:
        logger.error(f"✗ Data message demo failed: {e}")


async def demo_mixed_message():
    """Demo: Send a message with multiple part types."""
    logger.info("=== Demo 4: Mixed Message (Text + File + Data) ===")
    try:
        # Create a JSON file content
        json_data = '{"temperature": 25.5, "humidity": 60, "location": "kitchen"}'
        json_bytes = json_data.encode('utf-8')

        # Create structured metadata
        metadata = {
            "sensor_id": "kitchen_001",
            "timestamp": "2024-01-15T10:30:00Z",
            "calibration_date": "2024-01-01"
        }

        # Create all part types
        text_part = create_text_part("Here's the latest sensor data with metadata:")
        file_part = create_file_part_from_bytes(
            json_bytes,
            mime_type="application/json",
            name="sensor_data.json"
        )
        data_part = create_data_part(metadata)

        await send_message_with_parts([text_part, file_part, data_part])
        logger.info("✓ Mixed message demo completed successfully")
    except Exception as e:
        logger.error(f"✗ Mixed message demo failed: {e}")


async def demo_file_from_uri():
    """Demo: Send a message with a file reference (URI)."""
    logger.info("=== Demo 5: File from URI ===")
    try:
        # Create parts
        text_part = create_text_part("Please analyze this image:")
        file_part = create_file_part_from_uri(
            uri="https://example.com/images/sample.jpg",
            mime_type="image/jpeg",
            name="sample.jpg"
        )

        await send_message_with_parts([text_part, file_part])
        logger.info("✓ File from URI demo completed successfully")
    except Exception as e:
        logger.error(f"✗ File from URI demo failed: {e}")


async def demo_utility_functions():
    """Demo: Use utility functions for common scenarios."""
    logger.info("=== Demo 6: Utility Functions ===")
    try:
        # Example 1: Simple multipart message (if you have local files)
        # Uncomment and modify paths as needed:
        """
        payload = create_multipart_message(
            text="Please analyze these files:",
            file_paths=["./sample.txt", "./data.json"],
            data_objects=[{"context": "batch_analysis", "priority": "high"}]
        )
        request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(**payload))
        """

        # Example 2: JSON data part from string
        json_string = '{"metrics": {"accuracy": 0.95, "precision": 0.92}, "model": "v2.1"}'
        json_part = create_json_data_part(json_string)
        text_part = create_text_part("Here are the model performance metrics:")

        await send_message_with_parts([text_part, json_part])
        logger.info("✓ Utility functions demo completed successfully")
    except Exception as e:
        logger.error(f"✗ Utility functions demo failed: {e}")


async def main():
    """Run all demos with error handling."""
    demos = [
        ("Simple text message", demo_text_message),
        ("File message (from bytes)", demo_file_message),
        ("Structured data message", demo_data_message),
        ("Mixed message (multiple parts)", demo_mixed_message),
        ("File from URI", demo_file_from_uri),
        ("Utility functions", demo_utility_functions),
    ]

    results = {"success": 0, "failed": 0}

    for demo_name, demo_func in demos:
        try:
            logger.info(f"\n{'='*50}")
            logger.info(f"Running: {demo_name}")
            logger.info(f"{'='*50}")
            await demo_func()
            results["success"] += 1
        except Exception as e:
            logger.error(f"Demo '{demo_name}' failed: {e}")
            results["failed"] += 1
            # Continue with other demos

    logger.info(f"\n{'='*50}")
    logger.info(f"Demo Results: {results['success']} successful, {results['failed']} failed")
    logger.info(f"{'='*50}")


if __name__ == "__main__":
    # You can run individual demos:
    # asyncio.run(demo_text_message())
    asyncio.run(demo_file_message())
    # asyncio.run(demo_data_message())
    # asyncio.run(demo_mixed_message())
    # asyncio.run(demo_file_from_uri())
    # asyncio.run(demo_utility_functions())

    # Run a simple text message demo by default (uncomment to use):
    # asyncio.run(demo_text_message())

    # Run all demos (comment out to run individual demos):
    asyncio.run(main())

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@jer96 jer96 marked this pull request as ready for review August 1, 2025 20:38
@jer96 jer96 self-assigned this Aug 12, 2025
@jer96 jer96 enabled auto-merge (squash) August 12, 2025 19:22
Copy link
Contributor Author

@jer96 jer96 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-- deleted AI summary --

dbschmigelski
dbschmigelski previously approved these changes Aug 20, 2025
@jer96 jer96 merged commit ef18a25 into strands-agents:main Aug 20, 2025
12 checks passed
dbschmigelski pushed a commit to dbavro19/sdk-python that referenced this pull request Aug 28, 2025
This was referenced Sep 17, 2025
Unshure pushed a commit to Unshure/sdk-python that referenced this pull request Sep 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants