Skip to content

Commit

Permalink
Feat/refactor exts (#276)
Browse files Browse the repository at this point in the history
* feat: openai extension refactoring

* feat: adding refactor code / async.io

* feat: fix refactoring bugs

* fix: add manifest.json

* feat: add queue logic

* fix: fix issues
- remove test code
- prevent sending full content again
- add queue logic

* feat: fix parseSentence

* fix: fix end_segment bug

* feat: add chatflow abstraction
- chatflow
- refactor to simplify flow run
- added event emitter for intermedium execution

* feat: refactor openai, support multi data-stream data pack

* feat: finalize openai extension refactoring
- change asyncio.queue to AsyncQueue
- change the way we abstract chatflow
- use eventEmitter for easier tool notification
- use queue to ensure task are processed one by one and cancellable

* feat: add docs

* feat: don't use private api
  • Loading branch information
plutoless authored Sep 17, 2024
1 parent 681d29e commit e5d8f04
Show file tree
Hide file tree
Showing 21 changed files with 1,020 additions and 70 deletions.
3 changes: 1 addition & 2 deletions agents/manifest-lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@
"type": "system",
"name": "nlohmann_json",
"version": "3.11.2",
"hash": "72b15822c7ea9deef5e7ad96216ac55e93f11b00466dd1943afd5ee276e99d19",
"supports": []
"hash": "72b15822c7ea9deef5e7ad96216ac55e93f11b00466dd1943afd5ee276e99d19"
},
{
"type": "system",
Expand Down
4 changes: 4 additions & 0 deletions agents/ten_packages/bak/openai_chatgpt_python/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from . import openai_chatgpt_addon
from .log import logger

logger.info("openai_chatgpt_python extension loaded")
13 changes: 13 additions & 0 deletions agents/ten_packages/bak/openai_chatgpt_python/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging

logger = logging.getLogger("openai_chatgpt_python")
logger.setLevel(logging.INFO)

formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(process)d - [%(filename)s:%(lineno)d] - %(message)s"
)

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)

logger.addHandler(console_handler)
93 changes: 93 additions & 0 deletions agents/ten_packages/bak/openai_chatgpt_python/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{
"type": "extension",
"name": "openai_chatgpt_python",
"version": "0.4.0",
"dependencies": [
{
"type": "system",
"name": "ten_runtime_python",
"version": "0.2"
}
],
"api": {
"property": {
"api_key": {
"type": "string"
},
"frequency_penalty": {
"type": "float64"
},
"presence_penalty": {
"type": "float64"
},
"temperature": {
"type": "float64"
},
"top_p": {
"type": "float64"
},
"model": {
"type": "string"
},
"max_tokens": {
"type": "int64"
},
"base_url": {
"type": "string"
},
"prompt": {
"type": "string"
},
"greeting": {
"type": "string"
},
"checking_vision_text_items": {
"type": "string"
},
"proxy_url": {
"type": "string"
},
"max_memory_length": {
"type": "int64"
},
"enable_tools": {
"type": "bool"
}
},
"data_in": [
{
"name": "text_data",
"property": {
"text": {
"type": "string"
}
}
}
],
"data_out": [
{
"name": "text_data",
"property": {
"text": {
"type": "string"
}
}
}
],
"cmd_in": [
{
"name": "flush"
}
],
"cmd_out": [
{
"name": "flush"
}
],
"video_frame_in": [
{
"name": "video_frame"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
openai
numpy
requests
pillow
asyncio
79 changes: 65 additions & 14 deletions agents/ten_packages/extension/message_collector/src/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#
import json
import time
import uuid
from ten import (
AudioFrame,
VideoFrame,
Expand All @@ -19,7 +20,8 @@
)
from .log import logger


MAX_SIZE = 800 # 1 KB limit
OVERHEAD_ESTIMATE = 200 # Estimate for the overhead of metadata in the JSON

CMD_NAME_FLUSH = "flush"

Expand Down Expand Up @@ -89,16 +91,12 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None:
try:
final = data.get_property_bool(TEXT_DATA_FINAL_FIELD)
except Exception as e:
logger.warning(
f"on_data get_property_bool {TEXT_DATA_FINAL_FIELD} error: {e}"
)
pass

try:
stream_id = data.get_property_int(TEXT_DATA_STREAM_ID_FIELD)
except Exception as e:
logger.warning(
f"on_data get_property_int {TEXT_DATA_STREAM_ID_FIELD} error: {e}"
)
pass

try:
end_of_segment = data.get_property_bool(TEXT_DATA_END_OF_SEGMENT_FIELD)
Expand All @@ -124,19 +122,72 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None:

cached_text_map[stream_id] = text

msg_data = json.dumps({
"text": text,
# Generate a unique message ID for this batch of parts
message_id = str(uuid.uuid4())

# Prepare the main JSON structure without the text field
base_msg_data = {
"is_final": end_of_segment,
"stream_id": stream_id,
"message_id": message_id, # Add message_id to identify the split message
"data_type": "transcribe",
"text_ts": int(time.time() * 1000), # Convert to milliseconds
})
}

try:
# convert the origin text data to the protobuf data and send it to the graph.
ten_data = Data.create("data")
ten_data.set_property_buf("data", msg_data.encode())
ten_env.send_data(ten_data)
# Convert the text to UTF-8 bytes
text_bytes = text.encode('utf-8')

# If the text + metadata fits within the size limit, send it directly
if len(text_bytes) + OVERHEAD_ESTIMATE <= MAX_SIZE:
base_msg_data["text"] = text
msg_data = json.dumps(base_msg_data)
ten_data = Data.create("data")
ten_data.set_property_buf("data", msg_data.encode())
ten_env.send_data(ten_data)
else:
# Split the text bytes into smaller chunks, ensuring safe UTF-8 splitting
max_text_size = MAX_SIZE - OVERHEAD_ESTIMATE
total_length = len(text_bytes)
total_parts = (total_length + max_text_size - 1) // max_text_size # Calculate number of parts

def get_valid_utf8_chunk(start, end):
"""Helper function to ensure valid UTF-8 chunks."""
while end > start:
try:
# Decode to check if this chunk is valid UTF-8
text_part = text_bytes[start:end].decode('utf-8')
return text_part, end
except UnicodeDecodeError:
# Reduce the end point to avoid splitting in the middle of a character
end -= 1
# If no valid chunk is found (shouldn't happen with valid UTF-8 input), return an empty string
return "", start

part_number = 0
start_index = 0
while start_index < total_length:
part_number += 1
# Get a valid UTF-8 chunk
text_part, end_index = get_valid_utf8_chunk(start_index, min(start_index + max_text_size, total_length))

# Prepare the part data with metadata
part_data = base_msg_data.copy()
part_data.update({
"text": text_part,
"part_number": part_number,
"total_parts": total_parts,
})

# Send each part
part_msg_data = json.dumps(part_data)
ten_data = Data.create("data")
ten_data.set_property_buf("data", part_msg_data.encode())
ten_env.send_data(ten_data)

# Move to the next chunk
start_index = end_index

except Exception as e:
logger.warning(f"on_data new_data error: {e}")
return
Expand Down
21 changes: 21 additions & 0 deletions agents/ten_packages/extension/openai_chatgpt_python/BUILD.gn
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
#
# Agora Real Time Engagement
# Created by Wei Hu in 2022-11.
# Copyright (c) 2024 Agora IO. All rights reserved.
#
#
import("//build/feature/ten_package.gni")

ten_package("openai_chatgpt_python") {
package_kind = "extension"

resources = [
"__init__.py",
"addon.py",
"extension.py",
"log.py",
"manifest.json",
"property.json",
]
}
60 changes: 60 additions & 0 deletions agents/ten_packages/extension/openai_chatgpt_python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# openai_chatgpt_python

An extension for integrating OpenAI's GPT models (e.g., GPT-4) into your application, providing configurable AI-driven features such as conversational agents, task automation, and tool integration.

## Features

<!-- main features introduction -->

- OpenAI GPT Integration: Leverage GPT models for text processing and conversational tasks.
- Configurable: Easily customize API keys, model settings, prompts, temperature, etc.
- Async Queue Processing: Supports real-time message processing with task cancellation and prioritization.
- Tool Support: Integrate external tools like image recognition via OpenAI's API.

## API

Refer to `api` definition in [manifest.json] and default values in [property.json](property.json).

<!-- Additional API.md can be referred to if extra introduction needed -->

| **Property** | **Type** | **Description** |
|----------------------------|------------|-------------------------------------------|
| `api_key` | `string` | API key for authenticating with OpenAI |
| `frequency_penalty` | `float64` | Controls how much to penalize new tokens based on their existing frequency in the text so far |
| `presence_penalty` | `float64` | Controls how much to penalize new tokens based on whether they appear in the text so far |
| `temperature` | `float64` | Sampling temperature, higher values mean more randomness |
| `top_p` | `float64` | Nucleus sampling, chooses tokens with cumulative probability `p` |
| `model` | `string` | Model identifier (e.g., GPT-3.5, GPT-4) |
| `max_tokens` | `int64` | Maximum number of tokens to generate |
| `base_url` | `string` | API base URL |
| `prompt` | `string` | Default prompt to send to the model |
| `greeting` | `string` | Greeting message to be used |
| `checking_vision_text_items`| `string` | Items for checking vision-based text responses |
| `proxy_url` | `string` | URL of the proxy server |
| `max_memory_length` | `int64` | Maximum memory length for processing |
| `enable_tools` | `bool` | Flag to enable or disable external tools |

### Data In:
| **Name** | **Property** | **Type** | **Description** |
|----------------|--------------|------------|-------------------------------|
| `text_data` | `text` | `string` | Incoming text data |

### Data Out:
| **Name** | **Property** | **Type** | **Description** |
|----------------|--------------|------------|-------------------------------|
| `text_data` | `text` | `string` | Outgoing text data |

### Command In:
| **Name** | **Description** |
|----------------|---------------------------------------------|
| `flush` | Command to flush the current processing state |

### Command Out:
| **Name** | **Description** |
|----------------|---------------------------------------------|
| `flush` | Response after flushing the current state |

### Video Frame In:
| **Name** | **Description** |
|------------------|-------------------------------------------|
| `video_frame` | Video frame input for vision processing |
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from . import openai_chatgpt_addon
#
#
# Agora Real Time Engagement
# Created by Wei Hu in 2024-08.
# Copyright (c) 2024 Agora IO. All rights reserved.
#
#
from . import addon
from .log import logger

logger.info("openai_chatgpt_python extension loaded")
22 changes: 22 additions & 0 deletions agents/ten_packages/extension/openai_chatgpt_python/addon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
#
# Agora Real Time Engagement
# Created by Wei Hu in 2024-08.
# Copyright (c) 2024 Agora IO. All rights reserved.
#
#
from ten import (
Addon,
register_addon_as_extension,
TenEnv,
)
from .extension import OpenAIChatGPTExtension
from .log import logger


@register_addon_as_extension("openai_chatgpt_python")
class OpenAIChatGPTExtensionAddon(Addon):

def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None:
logger.info("OpenAIChatGPTExtensionAddon on_create_instance")
ten_env.on_create_instance_done(OpenAIChatGPTExtension(name), context)
Loading

0 comments on commit e5d8f04

Please sign in to comment.