Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 78 additions & 23 deletions src/mcp_server/core/live_streaming/live_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,33 @@
class LiveStreamingService:
def __init__(self, cfg: config.Config = None):
self.config = cfg
self.api_key = cfg.live_api_key if cfg else None
self.live_api_key = cfg.live_api_key if cfg else None
self.live_endpoint = cfg.live_endpoint if cfg else "mls.cn-east-1.qiniumiku.com"
self.access_key = cfg.access_key if cfg else None
self.secret_key = cfg.secret_key if cfg else None


def _get_auth_header(self, method: str, url: str, content_type: Optional[str] = None, body: Optional[str] = None) -> Dict[str, str]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CRITICAL: Missing error handling for unconfigured authentication

This method now returns None implicitly when neither authentication method is configured, which will cause runtime errors in all API methods. The removed error handling should be restored:

Suggested change
def _get_auth_header(self, method: str, url: str, content_type: Optional[str] = None, body: Optional[str] = None) -> Dict[str, str]:
def _get_auth_header(self, method: str, url: str, content_type: Optional[str] = None, body: Optional[str] = None) -> Dict[str, str]:
"""
Generate authentication header for API requests.
Priority order:
1. API KEY authentication (Bearer token) - if configured
2. ACCESS_KEY/SECRET_KEY authentication (Qiniu token) - if API KEY not available
Raises:
ValueError: If neither authentication method is properly configured
"""
# Priority 1: Use API KEY authentication if configured
if self.live_api_key and self.live_api_key != "YOUR_QINIU_LIVE_API_KEY":
return {
"Authorization": f"Bearer {self.live_api_key}"
}
# Priority 2: Fall back to ACCESS_KEY/SECRET_KEY authentication
if self.access_key and self.secret_key and \
self.access_key != "YOUR_QINIU_ACCESS_KEY" and \
self.secret_key != "YOUR_QINIU_SECRET_KEY":
token = self._generate_qiniu_token(method, url, content_type, body)
return {
"Authorization": f"Qiniu {token}"
}
raise ValueError(
"Authentication not configured: Either LIVE_API_KEY or both ACCESS_KEY/SECRET_KEY must be set"
)

Issues:

  • Security: Allows unauthenticated requests to be attempted
  • Reliability: Causes TypeError when headers=None is passed to aiohttp
  • Performance: Fails late with network errors instead of failing fast with clear config errors

"""
Generate authorization header
Priority: QINIU_ACCESS_KEY/QINIU_SECRET_KEY > API KEY
"""
# Priority 1: Use QINIU_ACCESS_KEY/QINIU_SECRET_KEY if configured

# Priority 1: Fall back to API KEY if ACCESS_KEY/SECRET_KEY not configured
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading comment: This says "Fall back to API KEY" but API KEY is checked first (Priority 1), not as a fallback. The comment contradicts the code behavior.

Should be: # Priority 1: Use API KEY authentication if configured

if self.live_api_key and self.live_api_key != "YOUR_QINIU_LIVE_API_KEY":
return {
"Authorization": f"Bearer {self.live_api_key}"
}

# Priority 2: Use QINIU_ACCESS_KEY/QINIU_SECRET_KEY if configured
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading comment: This should say "Fall back to" since ACCESS_KEY/SECRET_KEY is now the secondary authentication method (Priority 2).

Should be: # Priority 2: Fall back to ACCESS_KEY/SECRET_KEY authentication if API KEY not available

if self.access_key and self.secret_key and \
self.access_key != "YOUR_QINIU_ACCESS_KEY" and \
self.secret_key != "YOUR_QINIU_SECRET_KEY":
# Generate Qiniu token for the request
# For live streaming API, we use a simple token format
token = self._generate_qiniu_token(method, url, content_type, body)
# token = generate_signature(method, url, body,self.access_key, self.secret_key)
return {
"Authorization": f"Qiniu {token}"
}

# Priority 2: Fall back to API KEY if ACCESS_KEY/SECRET_KEY not configured
if not self.api_key or self.api_key == "YOUR_QINIU_API_KEY":
raise ValueError("Neither QINIU_ACCESS_KEY/QINIU_SECRET_KEY nor QINIU_API_KEY is configured")

return {
"Authorization": f"Bearer {self.api_key}"
return {
"Authorization": "Qiniu ak:sk"
}

def _build_bucket_url(self, bucket: str) -> str:
Expand Down Expand Up @@ -87,15 +86,35 @@ async def create_bucket(self, bucket: str) -> Dict[str, Any]:
Dict containing the response status and message
"""
url = self._build_bucket_url(bucket)
headers = self._get_auth_header(method="PUT",url=url)

logger.info(f"Creating bucket: {bucket} at {url}")
data = {}
bodyJson = json.dumps(data)
auth_headers = self._get_auth_header(method="PUT", url=url, content_type="application/json", body=bodyJson)
headers = {"Content-Type": "application/json"}
# 如果有认证头,添加到headers中
if auth_headers:
headers.update(auth_headers)

# 打印 HTTP 请求信息
print("=== HTTP 请求信息 ===")
print("方法: PUT")
print(f"URL: {url}")
print("请求头:")
for key, value in headers.items():
print(f" {key}: {value}")
print("请求体: {}")
print("===================")

print(f"Creating bucket: {bucket} at {url}")

async with aiohttp.ClientSession() as session:
async with session.put(url, headers=headers) as response:
async with session.put(url, headers=headers, data=bodyJson) as response:
status = response.status
text = await response.text()

print(f"状态码: {status}")
print(f"响应内容: {text}")
print("==================")

if status == 200 or status == 201:
logger.info(f"Successfully created bucket: {bucket}")
return {
Expand Down Expand Up @@ -127,12 +146,17 @@ async def create_stream(self, bucket: str, stream: str) -> Dict[str, Any]:
Dict containing the response status and message
"""
url = self._build_stream_url(bucket, stream)
headers = self._get_auth_header(method="PUT", url=url)
data = {}
bodyJson = json.dumps(data)
headers = {
**self._get_auth_header(method="PUT", url=url, content_type="application/json", body=bodyJson),
"Content-Type": "application/json"
}

logger.info(f"Creating stream: {stream} in bucket: {bucket} at {url}")

async with aiohttp.ClientSession() as session:
async with session.put(url, headers=headers) as response:
async with session.put(url, headers=headers, data=bodyJson) as response:
status = response.status
text = await response.text()

Expand Down Expand Up @@ -463,7 +487,6 @@ async def list_streams(self, bucket_id: str) -> Dict[str, Any]:
def _generate_qiniu_token(self, method: str, url: str, content_type: Optional[str] = None, body: Optional[str] = None) -> str:
if not self.access_key or not self.secret_key:
raise ValueError("QINIU_ACCESS_KEY and QINIU_SECRET_KEY are required")

# Parse the URL
parsed = urlparse(url)

Expand All @@ -475,7 +498,7 @@ def _generate_qiniu_token(self, method: str, url: str, content_type: Optional[st
data += f"?{parsed.query}"

# 3. Add Host
data += f"\nHost: {parsed.netloc}"
data += f"\nHost: {parsed.hostname}"

# 4. Add Content-Type if exists and not empty
if content_type:
Expand All @@ -500,9 +523,41 @@ def _generate_qiniu_token(self, method: str, url: str, content_type: Optional[st
sign = hmac.new(secret_bytes, data_bytes, hashlib.sha1).digest()

# 8. URL-safe Base64 encode
encoded_sign = base64.urlsafe_b64encode(sign).decode('utf-8')
encoded_pre = base64.b64encode(sign).decode('utf-8')
encoded_sign = encoded_pre.replace('+', '-').replace('/', '_')

# 9. Construct and return the Qiniu token
qiniu_token = f"{self.access_key}:{encoded_sign}"
return qiniu_token






def generate_signature(method, url, body, ak, sk):
parsed_url = urlparse(url)

# 构建签名数据
data = method + " " + parsed_url.path

if parsed_url.query:
data += "?" + parsed_url.query

data += "\nHost: " + parsed_url.hostname
data += "\nContent-Type: application/json"

if body:
data += "\n\n" + body
print(data)
# 使用HMAC-SHA1进行签名
hmac_sha1 = hmac.new(sk.encode('utf-8'), data.encode('utf-8'), hashlib.sha1)
hmac_result = hmac_sha1.digest()

sign = ak + ":" + base64_url_safe_encode(hmac_result)
return sign

return qiniu_token
def base64_url_safe_encode(data):
encoded = base64.b64encode(data).decode('utf-8')
encoded = encoded.replace('+', '-').replace('/', '_')
return encoded
Loading