Skip to content

Commit

Permalink
chore(): use the ten_env logging module uniformly
Browse files Browse the repository at this point in the history
  • Loading branch information
sunshinexcode committed Dec 26, 2024
1 parent 264f550 commit df670b3
Show file tree
Hide file tree
Showing 76 changed files with 592 additions and 900 deletions.
60 changes: 26 additions & 34 deletions agents/ten_packages/extension/agora_rtm_wrapper/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package extension

import (
"encoding/json"
"log/slog"
"fmt"
"strconv"

"ten_framework/ten"
Expand Down Expand Up @@ -46,10 +46,6 @@ type RtcUserSate struct {
Reason string `json:"reason"` // 原因
}

var (
logTag = slog.String("extension", "AGORA_RTM_WRAPPER_EXTENSION")
)

type agoraRtmWrapperExtension struct {
ten.DefaultExtension
}
Expand All @@ -65,14 +61,14 @@ func (p *agoraRtmWrapperExtension) OnData(
) {
buf, err := data.GetPropertyBytes("data")
if err != nil {
slog.Error("OnData GetProperty data error: " + err.Error())
tenEnv.LogError("OnData GetProperty data error: " + err.Error())
return
}
slog.Info("AGORA_RTM_WRAPPER_EXTENSION OnData: "+string(buf), logTag)
tenEnv.LogInfo("AGORA_RTM_WRAPPER_EXTENSION OnData: " + string(buf))
colllectorMessage := ColllectorMessage{}
err = json.Unmarshal(buf, &colllectorMessage)
if err != nil {
slog.Error("OnData Unmarshal data error: " + err.Error())
tenEnv.LogError("OnData Unmarshal data error: " + err.Error())
return
}

Expand All @@ -85,70 +81,70 @@ func (p *agoraRtmWrapperExtension) OnData(
}
jsonBytes, err := json.Marshal(message)
if err != nil {
slog.Error("failed to marshal JSON: " + err.Error())
tenEnv.LogError("failed to marshal JSON: " + err.Error())
return
}
slog.Info("AGORA_RTM_WRAPPER_EXTENSION OnData: "+string(jsonBytes), logTag)
tenEnv.LogInfo("AGORA_RTM_WRAPPER_EXTENSION OnData: " + string(jsonBytes))

cmd, _ := ten.NewCmd("publish")

err = cmd.SetPropertyBytes("message", jsonBytes)
if err != nil {
slog.Error("failed to set property message: " + err.Error())
tenEnv.LogError("failed to set property message: " + err.Error())
return
}
if err := tenEnv.SendCmd(cmd, func(_ ten.TenEnv, result ten.CmdResult) {
slog.Info("AGORA_RTM_WRAPPER_EXTENSION publish result " + result.ToJSON())
if err := tenEnv.SendCmd(cmd, func(_ ten.TenEnv, result ten.CmdResult, _ error) {
status, err := result.GetStatusCode()
tenEnv.LogInfo(fmt.Sprintf("AGORA_RTM_WRAPPER_EXTENSION publish result %d", status))
if status != ten.StatusCodeOk || err != nil {
slog.Error("failed to subscribe ")
tenEnv.LogError("failed to subscribe")
}
}); err != nil {
slog.Error("failed to send command " + err.Error())
tenEnv.LogError("failed to send command " + err.Error())
}
}

func (p *agoraRtmWrapperExtension) OnCmd(tenEnv ten.TenEnv, cmd ten.Cmd) {
defer func() {
if r := recover(); r != nil {
slog.Error("OnCmd panic", "recover", r)
tenEnv.LogError(fmt.Sprintf("OnCmd panic: %v", r))
}
cmdResult, err := ten.NewCmdResult(ten.StatusCodeOk)
if err != nil {
slog.Error("failed to create cmd result", "err", err)
tenEnv.LogError(fmt.Sprintf("failed to create cmd result: %v", err))
return
}
tenEnv.ReturnResult(cmdResult, cmd)
tenEnv.ReturnResult(cmdResult, cmd, nil)
}()
cmdName, err := cmd.GetName()
if err != nil {
slog.Error("failed to get cmd name", "err", err)
tenEnv.LogError(fmt.Sprintf("failed to get cmd name: %v", err))
return
}
slog.Info(cmd.ToJSON(), logTag)
tenEnv.LogInfo(fmt.Sprintf("received command: %s", cmdName))
switch cmdName {
case "on_user_audio_track_state_changed":
// on_user_audio_track_state_changed
p.handleUserStateChanged(tenEnv, cmd)
default:
slog.Warn("unsupported cmd", "cmd", cmdName)
tenEnv.LogWarn(fmt.Sprintf("unsupported cmd: %s", cmdName))
}
}

func (p *agoraRtmWrapperExtension) handleUserStateChanged(tenEnv ten.TenEnv, cmd ten.Cmd) {
remoteUserID, err := cmd.GetPropertyString("remote_user_id")
if err != nil {
slog.Error("failed to get remote_user_id", "err", err)
tenEnv.LogError(fmt.Sprintf("failed to get remote_user_id: %v", err))
return
}
state, err := cmd.GetPropertyInt32("state")
if err != nil {
slog.Error("failed to get state", "err", err)
tenEnv.LogError(fmt.Sprintf("failed to get state: %v", err))
return
}
reason, err := cmd.GetPropertyInt32("reason")
if err != nil {
slog.Error("failed to get reason", "err", err)
tenEnv.LogError(fmt.Sprintf("failed to get reason: %v", err))
return
}
userState := RtcUserSate{
Expand All @@ -158,28 +154,24 @@ func (p *agoraRtmWrapperExtension) handleUserStateChanged(tenEnv ten.TenEnv, cmd
}
jsonBytes, err := json.Marshal(userState)
if err != nil {
slog.Error("failed to marshal JSON: " + err.Error())
tenEnv.LogError("failed to marshal JSON: " + err.Error())
return
}
sendCmd, _ := ten.NewCmd("set_presence_state")
sendCmd.SetPropertyString("states", string(jsonBytes))
cmdStr := sendCmd.ToJSON()
slog.Info("AGORA_RTM_WRAPPER_EXTENSION SetRtmPresenceState " + cmdStr)
if err := tenEnv.SendCmd(sendCmd, func(_ ten.TenEnv, result ten.CmdResult) {
slog.Info("AGORA_RTM_WRAPPER_EXTENSION SetRtmPresenceState result " + result.ToJSON())
tenEnv.LogInfo("AGORA_RTM_WRAPPER_EXTENSION SetRtmPresenceState " + string(jsonBytes))
if err := tenEnv.SendCmd(sendCmd, func(_ ten.TenEnv, result ten.CmdResult, _ error) {
status, err := result.GetStatusCode()
tenEnv.LogInfo(fmt.Sprintf("AGORA_RTM_WRAPPER_EXTENSION SetRtmPresenceState result %d", status))
if status != ten.StatusCodeOk || err != nil {
panic("failed to SetRtmPresenceState ")
panic("failed to SetRtmPresenceState")
}
}); err != nil {
slog.Error("failed to send command " + err.Error())
tenEnv.LogError("failed to send command " + err.Error())
}

}

func init() {
slog.Info("agora_rtm_wrapper extension init", logTag)

// Register addon
ten.RegisterAddonAsExtension(
"agora_rtm_wrapper",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
from . import vector_storage_addon
from .log import logger

logger.info("aliyun_analyticdb_vector_storage extension loaded")
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# -*- coding: utf-8 -*-

try:
from .log import logger
except ImportError:
from log import logger
import asyncio
import threading
from typing import Coroutine
Expand All @@ -16,7 +12,7 @@

# maybe need multiple clients
class AliGPDBClient:
def __init__(self, access_key_id, access_key_secret, endpoint):
def __init__(self, ten_env, access_key_id, access_key_secret, endpoint):
self.stopEvent = asyncio.Event()
self.loop = None
self.tasks = asyncio.Queue()
Expand All @@ -28,6 +24,7 @@ def __init__(self, access_key_id, access_key_secret, endpoint):
target=asyncio.run, args=(self.__thread_routine(),)
)
self.thread.start()
self.ten_env = ten_env

async def stop_thread(self):
self.stopEvent.set()
Expand All @@ -50,7 +47,7 @@ def close(self):
self.thread.join()

async def __thread_routine(self):
logger.info("client __thread_routine start")
self.ten_env.log_info("client __thread_routine start")
self.loop = asyncio.get_running_loop()
tasks = set()
while not self.stopEvent.is_set():
Expand All @@ -68,11 +65,11 @@ async def __thread_routine(self):
)
for task in done:
if task.exception():
logger.error(f"task exception: {task.exception()}")
self.ten_env.log_error(f"task exception: {task.exception()}")
future.set_exception(task.exception())
else:
await asyncio.sleep(0.1)
logger.info("client __thread_routine end")
self.ten_env.log_info("client __thread_routine end")

async def submit_task(self, coro: Coroutine) -> Future:
future = Future()
Expand Down

This file was deleted.

Loading

0 comments on commit df670b3

Please sign in to comment.