Skip to content
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import asyncio
import base64
import io
import os
import sys
from collections.abc import AsyncGenerator, AsyncIterator
from typing import TYPE_CHECKING, Literal
from urllib.parse import parse_qs, urlparse

import aiohttp
import cv2
Expand Down Expand Up @@ -48,6 +50,45 @@
_AVATAR_AGENT_NAME = "bithuman-avatar-agent"


def _is_valid_base64(s: str) -> bool:
"""
Strictly validate if a string is valid base64 encoded data.

Args:
s: String to validate

Returns:
True if the string is valid base64, False otherwise
"""
import re

# Base64 strings should only contain A-Z, a-z, 0-9, +, /, and = for padding
# Remove whitespace for validation
s_clean = s.strip().replace(" ", "").replace("\n", "").replace("\r", "").replace("\t", "")

# Check if string is empty after cleaning
if not s_clean:
return False

# Base64 strings must have length that is a multiple of 4 (after padding)
# Padding can be 0, 1, or 2 '=' characters
if len(s_clean) % 4 != 0:
return False

# Check if string contains only valid base64 characters
base64_pattern = re.compile(r"^[A-Za-z0-9+/]*={0,2}$")
if not base64_pattern.match(s_clean):
return False

# Try to decode and verify it doesn't raise an exception
try:
decoded = base64.b64decode(s_clean)
# Additional check: decoded data should not be empty
return len(decoded) > 0
except Exception:
return False


class BitHumanException(Exception):
"""Exception for BitHuman errors"""

Expand Down Expand Up @@ -158,7 +199,9 @@ def __init__(
elif isinstance(avatar_image, str):
if os.path.exists(avatar_image):
self._avatar_image = Image.open(avatar_image)
elif avatar_image.startswith("http"):
elif avatar_image.startswith(("http://", "https://")):
self._avatar_image = avatar_image
elif _is_valid_base64(avatar_image):
self._avatar_image = avatar_image
else:
raise BitHumanException(f"Invalid avatar image: {avatar_image}")
Expand Down Expand Up @@ -307,8 +350,56 @@ async def _start_cloud_agent(
self, livekit_url: str, livekit_token: str, room_name: str
) -> None:
assert self._api_url is not None, "api_url is not set"
assert self._api_secret is not None, "api_secret is not set"

# Determine if using custom API endpoint (not the default BitHuman auth API)
# Custom endpoints use multipart/form-data format for direct avatar worker requests
is_custom_endpoint = not self._is_default_api_url()

if is_custom_endpoint:
# Use FormData format for custom endpoints
# Parse async parameter from URL if present
async_mode = self._parse_async_parameter_from_url()
await self._send_formdata_request(
livekit_url, livekit_token, room_name, async_mode=async_mode
)
else:
# Default BitHuman API requires api_secret
assert self._api_secret is not None, "api_secret is not set"
# Use JSON format for default BitHuman API
await self._send_json_request(livekit_url, livekit_token, room_name)

def _is_default_api_url(self) -> bool:
"""
Check if using the default BitHuman API URL.

Returns:
True if using default auth.api.bithuman.ai endpoint, False otherwise.
"""
if self._api_url is None:
return True
try:
parsed = urlparse(self._api_url)
hostname = parsed.hostname
if hostname is None:
return False
default_domains = ["auth.api.bithuman.ai", "api.bithuman.ai"]
return hostname in default_domains
except Exception:
# If parsing fails, fallback to substring matching
default_domains = ["auth.api.bithuman.ai", "api.bithuman.ai"]
return any(domain in self._api_url for domain in default_domains)

async def _send_json_request(
self, livekit_url: str, livekit_token: str, room_name: str
) -> None:
"""
Send request using JSON format (for default BitHuman API).

Args:
livekit_url: LiveKit server URL
livekit_token: JWT token for room access
room_name: Name of the LiveKit room
"""
# Prepare JSON data
json_data = {
"livekit_url": livekit_url,
Expand All @@ -320,35 +411,174 @@ async def _start_cloud_agent(
else "cpu",
}

# Handle avatar image
# Handle avatar image - convert to base64 for JSON serialization
if isinstance(self._avatar_image, Image.Image):
img_byte_arr = io.BytesIO()
self._avatar_image.save(img_byte_arr, format="JPEG", quality=95)
img_byte_arr.seek(0)
# Convert to base64 for JSON serialization
import base64

json_data["image"] = base64.b64encode(img_byte_arr.getvalue()).decode("utf-8")
elif isinstance(self._avatar_image, bytes):
# Convert bytes to base64 for JSON serialization
import base64

json_data["image"] = base64.b64encode(self._avatar_image).decode("utf-8")
elif isinstance(self._avatar_image, str):
json_data["image"] = self._avatar_image

if utils.is_given(self._avatar_id):
json_data["agent_id"] = self._avatar_id

assert self._api_secret is not None, "api_secret is required for default API"

headers = {
"Content-Type": "application/json",
"api-secret": self._api_secret,
}

await self._send_request_with_retry(
headers=headers,
json_data=json_data,
form_data=None,
)

def _parse_async_parameter_from_url(self) -> bool | None:
"""
Parse async parameter from api_url if present.

Returns:
True if async=true, False if async=false, None if not present
"""
if self._api_url is None:
return None

try:
parsed = urlparse(self._api_url)
query_params = parse_qs(parsed.query)

if "async" in query_params:
async_value = query_params["async"][0].lower()
if async_value == "true":
return True
elif async_value == "false":
return False
except Exception:
# If parsing fails, return None (don't add async_mode parameter)
pass

return None

async def _send_formdata_request(
self, livekit_url: str, livekit_token: str, room_name: str, async_mode: bool | None = None
) -> None:
"""
Send request using multipart/form-data format (for custom avatar worker endpoints).

This format is used for direct communication with avatar workers like:
- gpu-avatar-worker (FLOAT model)
- cpu-avatar-worker
- Cerebrium deployments

Args:
livekit_url: LiveKit server URL
livekit_token: JWT token for room access
room_name: Name of the LiveKit room
async_mode: Optional async_mode parameter (parsed from URL if async parameter present)
"""
# Build form data with required fields
form_data = aiohttp.FormData()
form_data.add_field("livekit_url", livekit_url)
form_data.add_field("livekit_token", livekit_token)
form_data.add_field("room_name", room_name)

# Add async_mode parameter if parsed from URL
# FastAPI Form bool accepts "true"/"false" strings and converts them to boolean
if async_mode is not None:
form_data.add_field("async_mode", "true" if async_mode else "false")

# Handle avatar image - send as file upload or URL
if isinstance(self._avatar_image, Image.Image):
# Convert PIL Image to bytes and upload as file
img_byte_arr = io.BytesIO()
self._avatar_image.save(img_byte_arr, format="JPEG", quality=95)
img_byte_arr.seek(0)
form_data.add_field(
"avatar_image",
img_byte_arr,
filename="avatar.jpg",
content_type="image/jpeg",
)
elif isinstance(self._avatar_image, str):
# String can be URL or base64 - check if it's a URL
if self._avatar_image.startswith(("http://", "https://")):
form_data.add_field("avatar_image_url", self._avatar_image)
elif _is_valid_base64(self._avatar_image):
# Valid base64 string, decode and upload as file
try:
decoded_image = base64.b64decode(self._avatar_image)
img_byte_arr = io.BytesIO(decoded_image)
form_data.add_field(
"avatar_image",
img_byte_arr,
filename="avatar.jpg",
content_type="image/jpeg",
)
except Exception as err:
# If decode fails despite validation, raise error
raise BitHumanException(
f"Failed to decode base64 avatar image: {self._avatar_image[:50]}..."
) from err
else:
# Not a URL and not valid base64, raise error
raise BitHumanException(
f"Invalid avatar image string: must be a URL (starting with http:// or https://) "
f"or valid base64 encoded data. Got: {self._avatar_image[:50]}..."
)

# Add avatar_id if provided
if utils.is_given(self._avatar_id):
form_data.add_field("avatar_id", self._avatar_id)

# Authorization header for custom endpoints uses api_token (Bearer token format)
# Note: api_token is different from api_secret - token is for direct API access,
# while secret is for BitHuman's authentication service
auth_token = self._api_token or self._api_secret
if auth_token is None:
raise BitHumanException(
"api_token or api_secret is required for custom endpoint requests. "
"Set BITHUMAN_API_TOKEN or BITHUMAN_API_SECRET environment variable."
)

headers = {
"Authorization": f"Bearer {auth_token}",
}

await self._send_request_with_retry(
headers=headers,
json_data=None,
form_data=form_data,
)

async def _send_request_with_retry(
self,
headers: dict[str, str],
json_data: dict | None = None,
form_data: aiohttp.FormData | None = None,
) -> None:
"""
Send HTTP request with retry logic.

Handles both JSON and FormData request formats with configurable retry behavior.

Args:
headers: HTTP headers to include in the request
json_data: JSON payload (mutually exclusive with form_data)
form_data: FormData payload (mutually exclusive with json_data)

Raises:
APIConnectionError: If all retry attempts fail
"""
for i in range(self._conn_options.max_retry):
try:
async with self._ensure_http_session().post(
self._api_url,
headers={
"Content-Type": "application/json",
"api-secret": self._api_secret,
},
headers=headers,
json=json_data,
data=form_data,
timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout),
) as response:
if not response.ok:
Expand Down