From 3abd2173e17986a3e1772b2305eac8cf39932fd8 Mon Sep 17 00:00:00 2001 From: Zhang Qianze Date: Sat, 24 Aug 2024 12:56:03 +0800 Subject: [PATCH] feat: use message_collector to replace chat_transcriber - add a new message_collector - replace chat_transcriber and take out cmd_conversion - UI adapt --- agents/property.json | 240 +++--------------- .../extension/message_collector/BUILD.gn | 22 ++ .../extension/message_collector/README.md | 29 +++ .../extension/message_collector/__init__.py | 11 + .../extension/message_collector/manifest.json | 51 ++++ .../extension/message_collector/property.json | 1 + .../message_collector/src/__init__.py | 0 .../extension/message_collector/src/addon.py | 22 ++ .../message_collector/src/extension.py | 150 +++++++++++ .../extension/message_collector/src/log.py | 22 ++ playground/src/manager/rtc/rtc.ts | 77 +++--- 11 files changed, 389 insertions(+), 236 deletions(-) create mode 100644 agents/ten_packages/extension/message_collector/BUILD.gn create mode 100644 agents/ten_packages/extension/message_collector/README.md create mode 100644 agents/ten_packages/extension/message_collector/__init__.py create mode 100644 agents/ten_packages/extension/message_collector/manifest.json create mode 100644 agents/ten_packages/extension/message_collector/property.json create mode 100644 agents/ten_packages/extension/message_collector/src/__init__.py create mode 100644 agents/ten_packages/extension/message_collector/src/addon.py create mode 100644 agents/ten_packages/extension/message_collector/src/extension.py create mode 100644 agents/ten_packages/extension/message_collector/src/log.py diff --git a/agents/property.json b/agents/property.json index 808b159f..bf384a9c 100644 --- a/agents/property.json +++ b/agents/property.json @@ -65,8 +65,8 @@ { "type": "extension", "extension_group": "transcriber", - "addon": "chat_transcriber", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension_group", @@ -107,7 +107,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -126,27 +126,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -191,7 +171,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", @@ -290,8 +270,8 @@ { "type": "extension", "extension_group": "transcriber", - "addon": "chat_transcriber", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension_group", @@ -332,7 +312,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -351,27 +331,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -416,7 +376,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", @@ -674,8 +634,8 @@ { "type": "extension", "extension_group": "chat_transcriber", - "addon": "chat_transcriber_python", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension_group", @@ -716,7 +676,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -735,27 +695,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -815,7 +755,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", @@ -884,8 +824,8 @@ { "type": "extension", "extension_group": "chat_transcriber", - "addon": "chat_transcriber_python", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension", @@ -928,7 +868,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -973,27 +913,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -1038,7 +958,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", @@ -1518,8 +1438,8 @@ { "type": "extension", "extension_group": "transcriber", - "addon": "chat_transcriber", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension_group", @@ -1560,7 +1480,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -1590,27 +1510,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -1655,7 +1555,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", @@ -1752,8 +1652,8 @@ { "type": "extension", "extension_group": "transcriber", - "addon": "chat_transcriber", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension_group", @@ -1794,7 +1694,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -1824,27 +1724,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -1889,7 +1769,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", @@ -1982,8 +1862,8 @@ { "type": "extension", "extension_group": "transcriber", - "addon": "chat_transcriber", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension_group", @@ -2024,7 +1904,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -2043,27 +1923,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -2108,7 +1968,7 @@ }, { "extension_group": "transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", @@ -2203,8 +2063,8 @@ { "type": "extension", "extension_group": "chat_transcriber", - "addon": "chat_transcriber_python", - "name": "chat_transcriber" + "addon": "message_collector", + "name": "message_collector" }, { "type": "extension", @@ -2280,7 +2140,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber" + "extension": "message_collector" } ] } @@ -2356,27 +2216,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber", - "cmd_conversions": [ - { - "cmd": { - "type": "per_property", - "keep_original": true, - "rules": [ - { - "path": "is_final", - "type": "fixed_value", - "value": "bool(true)" - }, - { - "path": "stream_id", - "type": "fixed_value", - "value": "uint32(999)" - } - ] - } - } - ] + "extension": "message_collector" } ] } @@ -2452,7 +2292,7 @@ }, { "extension_group": "chat_transcriber", - "extension": "chat_transcriber", + "extension": "message_collector", "data": [ { "name": "data", diff --git a/agents/ten_packages/extension/message_collector/BUILD.gn b/agents/ten_packages/extension/message_collector/BUILD.gn new file mode 100644 index 00000000..555cf959 --- /dev/null +++ b/agents/ten_packages/extension/message_collector/BUILD.gn @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("message_collector") { + package_kind = "extension" + + resources = [ + "__init__.py", + "manifest.json", + "property.json", + "src/__init__.py", + "src/addon.py", + "src/extension.py", + "src/log.py", + ] +} diff --git a/agents/ten_packages/extension/message_collector/README.md b/agents/ten_packages/extension/message_collector/README.md new file mode 100644 index 00000000..c5d6664d --- /dev/null +++ b/agents/ten_packages/extension/message_collector/README.md @@ -0,0 +1,29 @@ +# message_collector + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/message_collector/__init__.py b/agents/ten_packages/extension/message_collector/__init__.py new file mode 100644 index 00000000..46f01a81 --- /dev/null +++ b/agents/ten_packages/extension/message_collector/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from .src import addon +from .src.log import logger + +logger.info("message_collector extension loaded") diff --git a/agents/ten_packages/extension/message_collector/manifest.json b/agents/ten_packages/extension/message_collector/manifest.json new file mode 100644 index 00000000..c4004695 --- /dev/null +++ b/agents/ten_packages/extension/message_collector/manifest.json @@ -0,0 +1,51 @@ +{ + "type": "extension", + "name": "message_collector", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.1.0" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "src/**.tent", + "src/**.py", + "README.md" + ] + }, + "api": { + "property": {}, + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + }, + "is_final": { + "type": "bool" + }, + "stream_id": { + "type": "uint32" + }, + "end_of_segment": { + "type": "bool" + } + } + } + ], + "data_out": [ + { + "name": "data" + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/message_collector/property.json b/agents/ten_packages/extension/message_collector/property.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/agents/ten_packages/extension/message_collector/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/message_collector/src/__init__.py b/agents/ten_packages/extension/message_collector/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agents/ten_packages/extension/message_collector/src/addon.py b/agents/ten_packages/extension/message_collector/src/addon.py new file mode 100644 index 00000000..ce0629df --- /dev/null +++ b/agents/ten_packages/extension/message_collector/src/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import MessageCollectorExtension +from .log import logger + + +@register_addon_as_extension("message_collector") +class MessageCollectorExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("MessageCollectorExtensionAddon on_create_instance") + ten_env.on_create_instance_done(MessageCollectorExtension(name), context) diff --git a/agents/ten_packages/extension/message_collector/src/extension.py b/agents/ten_packages/extension/message_collector/src/extension.py new file mode 100644 index 00000000..e8e903f7 --- /dev/null +++ b/agents/ten_packages/extension/message_collector/src/extension.py @@ -0,0 +1,150 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import json +import time +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger + + + +CMD_NAME_FLUSH = "flush" + +TEXT_DATA_TEXT_FIELD = "text" +TEXT_DATA_FINAL_FIELD = "is_final" +TEXT_DATA_STREAM_ID_FIELD = "stream_id" +TEXT_DATA_END_OF_SEGMENT_FIELD = "end_of_segment" + +# record the cached text data for each stream id +cached_text_map = {} + + +class MessageCollectorExtension(Extension): + def on_init(self, ten_env: TenEnv) -> None: + logger.info("MessageCollectorExtension on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("MessageCollectorExtension on_start") + + # TODO: read properties, initialize resources + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("MessageCollectorExtension on_stop") + + # TODO: clean up resources + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("MessageCollectorExtension on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + + # TODO: process cmd + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + """ + on_data receives data from ten graph. + current suppotend data: + - name: text_data + example: + {"name": "text_data", "properties": {"text": "hello", "is_final": true, "stream_id": 123, "end_of_segment": true}} + """ + logger.info(f"on_data") + + text = "" + final = True + stream_id = 0 + end_of_segment = False + + try: + text = data.get_property_string(TEXT_DATA_TEXT_FIELD) + except Exception as e: + logger.exception( + f"on_data get_property_string {TEXT_DATA_TEXT_FIELD} error: {e}" + ) + + try: + final = data.get_property_bool(TEXT_DATA_FINAL_FIELD) + except Exception as e: + logger.exception( + f"on_data get_property_bool {TEXT_DATA_FINAL_FIELD} error: {e}" + ) + + try: + stream_id = data.get_property_int(TEXT_DATA_STREAM_ID_FIELD) + except Exception as e: + logger.exception( + f"on_data get_property_int {TEXT_DATA_STREAM_ID_FIELD} error: {e}" + ) + + try: + end_of_segment = data.get_property_bool(TEXT_DATA_END_OF_SEGMENT_FIELD) + except Exception as e: + logger.exception( + f"on_data get_property_bool {TEXT_DATA_END_OF_SEGMENT_FIELD} error: {e}" + ) + + logger.debug( + f"on_data {TEXT_DATA_TEXT_FIELD}: {text} {TEXT_DATA_FINAL_FIELD}: {final} {TEXT_DATA_STREAM_ID_FIELD}: {stream_id} {TEXT_DATA_END_OF_SEGMENT_FIELD}: {end_of_segment}" + ) + + # We cache all final text data and append the non-final text data to the cached data + # until the end of the segment. + if end_of_segment: + if stream_id in cached_text_map: + text = cached_text_map[stream_id] + text + del cached_text_map[stream_id] + else: + if final: + if stream_id in cached_text_map: + text = cached_text_map[stream_id] + text + + cached_text_map[stream_id] = text + + msg_data = json.dumps({ + "text": text, + "is_final": end_of_segment, + "stream_id": stream_id, + "data_type": "transcribe", + "text_ts": int(time.time() * 1000), # Convert to milliseconds + }) + + try: + # convert the origin text data to the protobuf data and send it to the graph. + ten_data = Data.create("data") + ten_data.set_property_buf("data", msg_data.encode()) + ten_env.send_data(ten_data) + except Exception as e: + logger.warning(f"on_data new_data error: {e}") + return + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + # TODO: process pcm frame + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + # TODO: process image frame + pass diff --git a/agents/ten_packages/extension/message_collector/src/log.py b/agents/ten_packages/extension/message_collector/src/log.py new file mode 100644 index 00000000..ff7a400f --- /dev/null +++ b/agents/ten_packages/extension/message_collector/src/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("message_collector") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/playground/src/manager/rtc/rtc.ts b/playground/src/manager/rtc/rtc.ts index 3b6617b3..0439be78 100644 --- a/playground/src/manager/rtc/rtc.ts +++ b/playground/src/manager/rtc/rtc.ts @@ -109,46 +109,51 @@ export class RtcManager extends AGEventEmitter { private _praseData(data: any): ITextItem | void { // @ts-ignore - const textstream = protoRoot.Agora.SpeechToText.lookup("Text").decode(data) - if (!textstream) { - return console.warn("Prase data failed.") - } + // const textstream = protoRoot.Agora.SpeechToText.lookup("Text").decode(data) + // if (!textstream) { + // return console.warn("Prase data failed.") + // } + let decoder = new TextDecoder('utf-8') + let decodedMessage = decoder.decode(data) + + const textstream = JSON.parse(decodedMessage) + console.log("[test] textstream raw data", JSON.stringify(textstream)) - const { dataType, words, uid, culture, time, durationMs, textTs, trans } = textstream + const { stream_id, is_final, text, text_ts, data_type } = textstream let textStr: string = "" let isFinal = false const textItem: ITextItem = {} as ITextItem - textItem.uid = uid - textItem.time = textTs - switch (dataType) { - case "transcribe": - words.forEach((word: any) => { - textStr += word.text - if (word.isFinal) { - isFinal = true - } - }) - textItem.dataType = "transcribe" - textItem.language = culture - textItem.text = textStr - textItem.isFinal = isFinal - this.emit("textChanged", textItem) - break - // case "translate": - // if (!trans?.length) { - // return - // } - // trans.forEach((transItem: any) => { - // textStr = transItem.texts.join("") - // isFinal = !!transItem.isFinal - // textItem.dataType = "translate" - // textItem.language = transItem.lang - // textItem.isFinal = isFinal - // textItem.text = textStr - // this.emit("textChanged", textItem) - // }) - // break - } + textItem.uid = stream_id + textItem.time = text_ts + // switch (dataType) { + // case "transcribe": + // words.forEach((word: any) => { + // textStr += word.text + // if (word.isFinal) { + // isFinal = true + // } + // }) + textItem.dataType = "transcribe" + // textItem.language = culture + textItem.text = text + textItem.isFinal = is_final + this.emit("textChanged", textItem) + // break + // case "translate": + // if (!trans?.length) { + // return + // } + // trans.forEach((transItem: any) => { + // textStr = transItem.texts.join("") + // isFinal = !!transItem.isFinal + // textItem.dataType = "translate" + // textItem.language = transItem.lang + // textItem.isFinal = isFinal + // textItem.text = textStr + // this.emit("textChanged", textItem) + // }) + // break + // } }