From 265063abb74904ea1480b4fca3e86eb8bbfd3702 Mon Sep 17 00:00:00 2001 From: xiayanji Date: Fri, 21 Nov 2025 20:39:02 +0800 Subject: [PATCH 1/3] update miku mcp tools --- .../core/live_streaming/live_streaming.py | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/src/mcp_server/core/live_streaming/live_streaming.py b/src/mcp_server/core/live_streaming/live_streaming.py index 2f2b0a6..fc0a10e 100644 --- a/src/mcp_server/core/live_streaming/live_streaming.py +++ b/src/mcp_server/core/live_streaming/live_streaming.py @@ -16,18 +16,21 @@ class LiveStreamingService: def __init__(self, cfg: config.Config = None): self.config = cfg - self.api_key = cfg.live_api_key if cfg else None + self.live_api_key = cfg.live_api_key if cfg else None self.live_endpoint = cfg.live_endpoint if cfg else "mls.cn-east-1.qiniumiku.com" self.access_key = cfg.access_key if cfg else None self.secret_key = cfg.secret_key if cfg else None def _get_auth_header(self, method: str, url: str, content_type: Optional[str] = None, body: Optional[str] = None) -> Dict[str, str]: - """ - Generate authorization header - Priority: QINIU_ACCESS_KEY/QINIU_SECRET_KEY > API KEY - """ - # Priority 1: Use QINIU_ACCESS_KEY/QINIU_SECRET_KEY if configured + + # Priority 1: Fall back to API KEY if ACCESS_KEY/SECRET_KEY not configured + if self.live_api_key and self.live_api_key != "YOUR_QINIU_LIVE_API_KEY": + return { + "Authorization": f"Bearer {self.live_api_key}" + } + + # Priority 2: Use QINIU_ACCESS_KEY/QINIU_SECRET_KEY if configured if self.access_key and self.secret_key and \ self.access_key != "YOUR_QINIU_ACCESS_KEY" and \ self.secret_key != "YOUR_QINIU_SECRET_KEY": @@ -38,14 +41,6 @@ def _get_auth_header(self, method: str, url: str, content_type: Optional[str] = "Authorization": f"Qiniu {token}" } - # Priority 2: Fall back to API KEY if ACCESS_KEY/SECRET_KEY not configured - if not self.api_key or self.api_key == "YOUR_QINIU_API_KEY": - raise ValueError("Neither QINIU_ACCESS_KEY/QINIU_SECRET_KEY nor QINIU_API_KEY is configured") - - return { - "Authorization": f"Bearer {self.api_key}" - } - def _build_bucket_url(self, bucket: str) -> str: """Build S3-style bucket URL""" if not self.live_endpoint: @@ -468,7 +463,7 @@ def _generate_qiniu_token(self, method: str, url: str, content_type: Optional[st parsed = urlparse(url) # 1. Add Method and Path - data = f"{method} {parsed.path or '/'}" + data = f"{method} {parsed.path}" # 2. Add Query if exists if parsed.query: From 70aa1e40dd0e1ba8477557259b5ed742579f84ff Mon Sep 17 00:00:00 2001 From: xiayanji Date: Sat, 22 Nov 2025 08:56:58 +0800 Subject: [PATCH 2/3] update miku mcp tools --- .../core/live_streaming/live_streaming.py | 83 ++++++++++++++++--- 1 file changed, 72 insertions(+), 11 deletions(-) diff --git a/src/mcp_server/core/live_streaming/live_streaming.py b/src/mcp_server/core/live_streaming/live_streaming.py index fc0a10e..2bbc6de 100644 --- a/src/mcp_server/core/live_streaming/live_streaming.py +++ b/src/mcp_server/core/live_streaming/live_streaming.py @@ -37,9 +37,13 @@ def _get_auth_header(self, method: str, url: str, content_type: Optional[str] = # Generate Qiniu token for the request # For live streaming API, we use a simple token format token = self._generate_qiniu_token(method, url, content_type, body) + # token = generate_signature(method, url, body,self.access_key, self.secret_key) return { "Authorization": f"Qiniu {token}" } + return { + "Authorization": f"Qiniu ak:sk" + } def _build_bucket_url(self, bucket: str) -> str: """Build S3-style bucket URL""" @@ -82,15 +86,36 @@ async def create_bucket(self, bucket: str) -> Dict[str, Any]: Dict containing the response status and message """ url = self._build_bucket_url(bucket) - headers = self._get_auth_header(method="PUT",url=url) - - logger.info(f"Creating bucket: {bucket} at {url}") + data = {} + bodyJson = json.dumps(data) + auth_headers = self._get_auth_header(method="PUT", url=url, content_type="application/json", body=bodyJson) + headers = {"Content-Type": "application/json"} + # 如果有认证头,添加到headers中 + if auth_headers: + headers.update(auth_headers) + + # 打印 HTTP 请求信息 + print("=== HTTP 请求信息 ===") + print(f"方法: PUT") + print(f"URL: {url}") + print("请求头:") + for key, value in headers.items(): + print(f" {key}: {value}") + print("请求体: {}") + print("===================") + + print(f"Creating bucket: {bucket} at {url}") async with aiohttp.ClientSession() as session: - async with session.put(url, headers=headers) as response: + async with session.put(url, headers=headers, data=bodyJson) as response: status = response.status text = await response.text() + print(f"=== HTTP 响应信息 ===") + print(f"状态码: {status}") + print(f"响应内容: {text}") + print("==================") + if status == 200 or status == 201: logger.info(f"Successfully created bucket: {bucket}") return { @@ -122,12 +147,17 @@ async def create_stream(self, bucket: str, stream: str) -> Dict[str, Any]: Dict containing the response status and message """ url = self._build_stream_url(bucket, stream) - headers = self._get_auth_header(method="PUT", url=url) + data = {} + bodyJson = json.dumps(data) + headers = { + **self._get_auth_header(method="PUT", url=url, content_type="application/json", body=bodyJson), + "Content-Type": "application/json" + } logger.info(f"Creating stream: {stream} in bucket: {bucket} at {url}") async with aiohttp.ClientSession() as session: - async with session.put(url, headers=headers) as response: + async with session.put(url, headers=headers, data=bodyJson) as response: status = response.status text = await response.text() @@ -458,19 +488,18 @@ async def list_streams(self, bucket_id: str) -> Dict[str, Any]: def _generate_qiniu_token(self, method: str, url: str, content_type: Optional[str] = None, body: Optional[str] = None) -> str: if not self.access_key or not self.secret_key: raise ValueError("QINIU_ACCESS_KEY and QINIU_SECRET_KEY are required") - # Parse the URL parsed = urlparse(url) # 1. Add Method and Path - data = f"{method} {parsed.path}" + data = f"{method} {parsed.path or '/'}" # 2. Add Query if exists if parsed.query: data += f"?{parsed.query}" # 3. Add Host - data += f"\nHost: {parsed.netloc}" + data += f"\nHost: {parsed.hostname}" # 4. Add Content-Type if exists and not empty if content_type: @@ -495,9 +524,41 @@ def _generate_qiniu_token(self, method: str, url: str, content_type: Optional[st sign = hmac.new(secret_bytes, data_bytes, hashlib.sha1).digest() # 8. URL-safe Base64 encode - encoded_sign = base64.urlsafe_b64encode(sign).decode('utf-8') + encoded_pre = base64.b64encode(sign).decode('utf-8') + encoded_sign = encoded_pre.replace('+', '-').replace('/', '_') # 9. Construct and return the Qiniu token qiniu_token = f"{self.access_key}:{encoded_sign}" + return qiniu_token + + + + + + +def generate_signature(method, url, body, ak, sk): + parsed_url = urlparse(url) + + # 构建签名数据 + data = method + " " + parsed_url.path + + if parsed_url.query: + data += "?" + parsed_url.query + + data += "\nHost: " + parsed_url.hostname + data += "\nContent-Type: application/json" + + if body: + data += "\n\n" + body + print(data) + # 使用HMAC-SHA1进行签名 + hmac_sha1 = hmac.new(sk.encode('utf-8'), data.encode('utf-8'), hashlib.sha1) + hmac_result = hmac_sha1.digest() + + sign = ak + ":" + base64_url_safe_encode(hmac_result) + return sign - return qiniu_token \ No newline at end of file +def base64_url_safe_encode(data): + encoded = base64.b64encode(data).decode('utf-8') + encoded = encoded.replace('+', '-').replace('/', '_') + return encoded From 782fd306b8c7d65f38059452e7f7df6f4884be56 Mon Sep 17 00:00:00 2001 From: xiayanji Date: Sat, 22 Nov 2025 09:04:51 +0800 Subject: [PATCH 3/3] update miku mcp tools --- src/mcp_server/core/live_streaming/live_streaming.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/mcp_server/core/live_streaming/live_streaming.py b/src/mcp_server/core/live_streaming/live_streaming.py index 2bbc6de..ce37858 100644 --- a/src/mcp_server/core/live_streaming/live_streaming.py +++ b/src/mcp_server/core/live_streaming/live_streaming.py @@ -42,7 +42,7 @@ def _get_auth_header(self, method: str, url: str, content_type: Optional[str] = "Authorization": f"Qiniu {token}" } return { - "Authorization": f"Qiniu ak:sk" + "Authorization": "Qiniu ak:sk" } def _build_bucket_url(self, bucket: str) -> str: @@ -96,7 +96,7 @@ async def create_bucket(self, bucket: str) -> Dict[str, Any]: # 打印 HTTP 请求信息 print("=== HTTP 请求信息 ===") - print(f"方法: PUT") + print("方法: PUT") print(f"URL: {url}") print("请求头:") for key, value in headers.items(): @@ -111,7 +111,6 @@ async def create_bucket(self, bucket: str) -> Dict[str, Any]: status = response.status text = await response.text() - print(f"=== HTTP 响应信息 ===") print(f"状态码: {status}") print(f"响应内容: {text}") print("==================")